diff options
| author | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
|---|---|---|
| committer | Robert Gemmell <robbie@apache.org> | 2012-04-17 09:01:44 +0000 |
| commit | 3203eea7641e1b0f39de96d797db7c54423b7f02 (patch) | |
| tree | f2563ba4a85ac54765d8f62663b60853846b3a89 /qpid/java/broker | |
| parent | deab61acfe5f4edaae121cf6b9fa5d4b9e42803f (diff) | |
| download | qpid-python-3203eea7641e1b0f39de96d797db7c54423b7f02.tar.gz | |
QPID-3923: Store queue, exchange and binding as configured objects in bdb store
Applied patch by Oleksandr Rudyy <orudyy@gmail.com>, Phil Harvey <phil@philharveyonline.com>, and myself.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
61 files changed, 2387 insertions, 914 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java index 2d6f7e0946..034a4ae53c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.topic.TopicNormalizer; import org.apache.qpid.server.exchange.topic.TopicParser; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; import org.apache.qpid.server.virtualhost.HouseKeepingTask; @@ -83,7 +84,8 @@ public class ManagementExchange implements Exchange, QMFService.Listener private class ManagementQueue implements BaseQueue { - private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + UUID.randomUUID().toString(); + private final UUID QUEUE_ID = UUIDGenerator.generateUUID(); + private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString(); private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING); public void enqueue(ServerMessage message) throws AMQException @@ -129,9 +131,10 @@ public class ManagementExchange implements Exchange, QMFService.Listener return NAME_AS_SHORT_STRING; } - public String getResourceName() + @Override + public UUID getId() { - return NAME_AS_STRING; + return QUEUE_ID; } } @@ -155,14 +158,14 @@ public class ManagementExchange implements Exchange, QMFService.Listener return ManagementExchange.class; } - public ManagementExchange newInstance(VirtualHost host, + public ManagementExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { ManagementExchange exch = new ManagementExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } @@ -183,7 +186,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener return QPID_MANAGEMENT_TYPE; } - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { if(!QPID_MANAGEMENT.equals(name)) @@ -191,7 +194,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener throw new AMQException("Can't create more than one Management exchange"); } _virtualHost = host; - _id = host.getConfigStore().createId(); + _id = id; _virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost)); getConfigStore().addConfiguredObject(this); getQMFService().addListener(this); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java index 7ef06ce0f8..0f32b98aa8 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java @@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.ManagementActor; import org.apache.qpid.server.management.AMQManagedObject; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.AMQQueueMBean; @@ -48,6 +49,7 @@ import java.io.IOException; import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.UUID; /** * This MBean implements the broker management interface and exposes the @@ -171,8 +173,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName)); if (exchange == null) { - exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type), - durable, false, 0); + exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), + new AMQShortString(type), durable, false, 0); _exchangeRegistry.registerExchange(exchange); if (durable) { @@ -244,45 +246,42 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException { final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName); - AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); - if (queue != null) + synchronized (_queueRegistry) { - throw new JMException("The queue \"" + queueName + "\" already exists."); - } - - CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger())); - try - { - AMQShortString ownerShortString = null; - if (owner != null) + AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString); + if (queue != null) { - ownerShortString = new AMQShortString(owner); + throw new JMException("The queue \"" + queueName + "\" already exists."); } - FieldTable args = null; - if(arguments != null) + CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger())); + try { - args = FieldTable.convertToFieldTable(arguments); - } - final VirtualHost virtualHost = getVirtualHost(); + FieldTable args = null; + if(arguments != null) + { + args = FieldTable.convertToFieldTable(arguments); + } + final VirtualHost virtualHost = getVirtualHost(); + + queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, durable, owner, + false, false, getVirtualHost(), arguments); + if (queue.isDurable() && !queue.isAutoDelete()) + { + getVirtualHost().getMessageStore().createQueue(queue, args); + } - queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString, - false, false, getVirtualHost(), args); - if (queue.isDurable() && !queue.isAutoDelete()) + virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null); + } + catch (AMQException ex) { - getVirtualHost().getMessageStore().createQueue(queue, args); + JMException jme = new JMException(ex.toString()); + throw new MBeanException(jme, "Error in creating queue " + queueName); + } + finally + { + CurrentActor.remove(); } - - virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null); - } - catch (AMQException ex) - { - JMException jme = new JMException(ex.toString()); - throw new MBeanException(jme, "Error in creating queue " + queueName); - } - finally - { - CurrentActor.remove(); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java index 8e44da095a..2efd4cee26 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -117,7 +117,7 @@ public class Binding public String toString() { - return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}"; + return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }"; } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java index 2460be4705..abf252c733 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQInternalException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.framing.AMQShortString; -import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.configuration.BindingConfig; import org.apache.qpid.server.configuration.BindingConfigType; import org.apache.qpid.server.configuration.ConfigStore; @@ -33,11 +32,13 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.messages.BindingMessages; import org.apache.qpid.server.logging.subjects.BindingLogSubject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collections; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class BindingFactory @@ -57,9 +58,9 @@ public class BindingFactory //TODO : persist creation time private long _createTime = System.currentTimeMillis(); - private BindingImpl(String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) + private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) { - super(queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments); + super(id, bindingKey, queue, exchange, arguments); _logSubject = new BindingLogSubject(bindingKey,exchange,queue); } @@ -116,19 +117,19 @@ public class BindingFactory public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { - return makeBinding(bindingKey, queue, exchange, arguments, false, false); + return makeBinding(null, bindingKey, queue, exchange, arguments, false, false); } - public boolean replaceBinding(final String bindingKey, + public boolean replaceBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException { - return makeBinding(bindingKey, queue, exchange, arguments, false, true); + return makeBinding(id, bindingKey, queue, exchange, arguments, false, true); } - private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException + private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException { assert queue != null; final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange(); @@ -163,9 +164,12 @@ public class BindingFactory } } - - BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments); - BindingImpl existingMapping = _bindings.putIfAbsent(b,b); + if (id == null) + { + id = UUIDGenerator.generateUUID(); + } + BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments); + BindingImpl existingMapping = _bindings.putIfAbsent(b, b); if (existingMapping == null || force) { if (existingMapping != null) @@ -175,7 +179,7 @@ public class BindingFactory if (b.isDurable() && !restore) { - _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments)); + _virtualHost.getMessageStore().bindQueue(b); } queue.addQueueDeleteTask(b); @@ -198,9 +202,9 @@ public class BindingFactory return _virtualHost.getConfigStore(); } - public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException + public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException { - makeBinding(bindingKey,queue,exchange,argumentMap,true, false); + makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false); } public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException @@ -239,7 +243,7 @@ public class BindingFactory } } - BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments)); + BindingImpl b = _bindings.remove(new BindingImpl(null, bindingKey,queue,exchange,arguments)); if (b != null) { @@ -250,10 +254,7 @@ public class BindingFactory if (b.isDurable()) { - _virtualHost.getMessageStore().unbindQueue(exchange, - new AMQShortString(bindingKey), - queue, - FieldTable.convertToFieldTable(arguments)); + _virtualHost.getMessageStore().unbindQueue(b); } b.logDestruction(); getConfigStore().removeConfiguredObject(b); @@ -280,7 +281,7 @@ public class BindingFactory arguments = Collections.emptyMap(); } - BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments); + BindingImpl b = new BindingImpl(null, bindingKey,queue,exchange,arguments); return _bindings.get(b); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index af49168a80..9493f400f2 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -114,7 +114,7 @@ public abstract class AbstractExchange implements Exchange, Managable */ protected abstract AbstractExchangeMBean createMBean() throws JMException; - public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { _virtualHost = host; @@ -123,7 +123,7 @@ public abstract class AbstractExchange implements Exchange, Managable _autoDelete = autoDelete; _ticket = ticket; - _id = getConfigStore().createId(); + _id = id; getConfigStore().addConfiguredObject(this); createAndRegisterMBean(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 153419de1b..5058f91995 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -25,9 +25,11 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.AMQUnknownExchangeType; +import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.qmf.ManagementExchange; import org.apache.qpid.server.configuration.VirtualHostConfiguration; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -35,6 +37,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; import java.util.Map; +import java.util.UUID; public class DefaultExchangeFactory implements ExchangeFactory { @@ -76,17 +79,29 @@ public class DefaultExchangeFactory implements ExchangeFactory return publicTypes; } - - public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) - throws AMQException + throws AMQException { return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); } - public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete, - int ticket) + public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) + throws AMQException + { + return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0); + } + + public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) + throws AMQException + { + UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName()); + return createExchange(id, exchange, type, durable, autoDelete, ticket); + } + + public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) throws AMQException { // Check access @@ -102,7 +117,7 @@ public class DefaultExchangeFactory implements ExchangeFactory throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null); } - Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete); + Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete); return e; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java index 33e73b4668..bf4184bf0b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java @@ -30,6 +30,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -174,4 +175,25 @@ public class DefaultExchangeRegistry implements ExchangeRegistry _exchangeMapStr.clear(); } + @Override + public synchronized Exchange getExchange(UUID exchangeId) + { + if (exchangeId == null) + { + return getDefaultExchange(); + } + else + { + Collection<Exchange> exchanges = _exchangeMap.values(); + for (Exchange exchange : exchanges) + { + if (exchange.getId().equals(exchangeId)) + { + return exchange; + } + } + return null; + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 9525324f57..af9322764a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -36,6 +36,7 @@ import javax.management.JMException; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArraySet; @@ -103,14 +104,14 @@ public class DirectExchange extends AbstractExchange return DirectExchange.class; } - public DirectExchange newInstance(VirtualHost host, + public DirectExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { DirectExchange exch = new DirectExchange(); - exch.initialise(host,name,durable,ticket,autoDelete); + exch.initialise(id, host,name,durable,ticket,autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java index 0bcfc3a3da..289cb1a923 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java @@ -36,6 +36,7 @@ import javax.management.JMException; import java.util.Collection; import java.util.List; import java.util.Map; +import java.util.UUID; public interface Exchange extends ExchangeReferrer, ExchangeConfig { @@ -50,7 +51,7 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig AMQShortString getTypeShortString(); - void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) + void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException, JMException; boolean isDurable(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java index 577da79028..aae4ae89bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java @@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import java.util.Collection; +import java.util.UUID; public interface ExchangeFactory @@ -40,4 +41,10 @@ public interface ExchangeFactory Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes(); Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; + + Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException; + + Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable, + boolean autoDelete, int ticket) + throws AMQException; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java index 335efaeaa2..ba4f57a8e0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.exchange; + import org.apache.qpid.AMQException; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java index db244c114b..795ae2e140 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java @@ -24,6 +24,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import java.util.Collection; +import java.util.UUID; public interface ExchangeRegistry @@ -54,4 +55,6 @@ public interface ExchangeRegistry void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException; void clearAndUnregisterMbeans(); + + Exchange getExchange(UUID exchangeId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java index ce339c4e29..a01e41f039 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.exchange; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -29,7 +31,7 @@ public interface ExchangeType<T extends Exchange> { public AMQShortString getName(); public Class<T> getExchangeClass(); - public T newInstance(VirtualHost host, AMQShortString name, + public T newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException; public AMQShortString getDefaultExchangeName(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index f9ad2fad87..5ebcfd095f 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -34,6 +34,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.util.ArrayList; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; public class FanoutExchange extends AbstractExchange @@ -65,14 +66,14 @@ public class FanoutExchange extends AbstractExchange return FanoutExchange.class; } - public FanoutExchange newInstance(VirtualHost host, + public FanoutExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { FanoutExchange exch = new FanoutExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 2700a7cda3..16ba3c0431 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -37,6 +37,7 @@ import javax.management.JMException; import java.util.ArrayList; import java.util.LinkedHashSet; import java.util.Map; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.CopyOnWriteArraySet; @@ -93,12 +94,12 @@ public class HeadersExchange extends AbstractExchange return HeadersExchange.class; } - public HeadersExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket, + public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { HeadersExchange exch = new HeadersExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index 7ce84b7a89..7ea7a41826 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -27,6 +27,7 @@ import java.util.Collections; import java.util.HashMap; import java.util.HashSet; import java.util.Map; +import java.util.UUID; import java.util.WeakHashMap; import java.util.concurrent.ConcurrentHashMap; import javax.management.JMException; @@ -70,14 +71,14 @@ public class TopicExchange extends AbstractExchange return TopicExchange.class; } - public TopicExchange newInstance(VirtualHost host, + public TopicExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete) throws AMQException { TopicExchange exch = new TopicExchange(); - exch.initialise(host, name, durable, ticket, autoDelete); + exch.initialise(id, host, name, durable, ticket, autoDelete); return exch; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java index 4b4bdd4efb..c7046f8e53 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java @@ -767,13 +767,13 @@ public class Bridge implements BridgeConfig try { - _queue = AMQQueueFactory.createAMQQueueImpl(_tmpQueueName, + _queue = AMQQueueFactory.createAMQQueueImpl(null, + _tmpQueueName, isDurable(), _link.getFederationTag(), false, false, - getVirtualHost(), - options); + getVirtualHost(), options); } catch (AMQException e) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java index bb979d5441..49ca934966 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java @@ -134,7 +134,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody> Map<String, Object> oldArgs = oldBinding.getArguments(); if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments))) { - virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments); + virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments); } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 7d993ae14d..396829df91 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -24,6 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.QueueDeclareBody; import org.apache.qpid.framing.QueueDeclareOkBody; @@ -31,6 +32,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -43,6 +45,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collections; +import java.util.Map; import java.util.UUID; public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody> @@ -219,10 +222,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar throws AMQException { final QueueRegistry registry = virtualHost.getQueueRegistry(); - AMQShortString owner = body.getExclusive() ? session.getContextKey() : null; + String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(),virtualHost, body.getArguments()); + Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), AMQShortString.toString(queueName), body.getDurable(), owner, body.getAutoDelete(), + body.getExclusive(),virtualHost, arguments); if (body.getExclusive() && !body.getDurable()) { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java new file mode 100644 index 0000000000..fdb009386c --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java @@ -0,0 +1,75 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public interface Binding extends ConfiguredObject +{ + + public String MATCHED_BYTES = "matchedBytes"; + public String MATCHED_MESSAGES = "matchedMessages"; + public String STATE_CHANGED = "stateChanged"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableCollection( + Arrays.asList( + MATCHED_BYTES, + MATCHED_MESSAGES, + STATE_CHANGED)); + + + public String ARGUMENTS = "arguments"; + public String CREATED = "created"; + public String DURABLE = "durable"; + public String ID = "id"; + public String LIFETIME_POLICY = "lifetimePolicy"; + public String NAME = "name"; + public String STATE = "state"; + public String TIME_TO_LIVE = "timeToLive"; + public String UPDATED = "updated"; + public String QUEUE = "queue"; + public String EXCHANGE = "exchange"; + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableCollection( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + EXCHANGE, + QUEUE, + ARGUMENTS) + ); + + + + Map<String,Object> getArguments(); + + void delete(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java new file mode 100644 index 0000000000..6477633a9b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java @@ -0,0 +1,38 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public interface ConfigurationChangeListener +{ + /** + * Inform the listener that the passed object has changed state + * + * @param object the object whose state has changed + * @param oldState the state prior to the change + * @param newState the state after the change + */ + void stateChanged(ConfiguredObject object, State oldState, State newState); + + void childAdded(ConfiguredObject object, ConfiguredObject child); + + void childRemoved(ConfiguredObject object, ConfiguredObject child); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java new file mode 100644 index 0000000000..fb47a54d0a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -0,0 +1,229 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.security.AccessControlException; +import java.util.Collection; +import java.util.UUID; + +public interface ConfiguredObject +{ + + /** + * Get the universally unique identifier for the object + * + * @return the objects id + */ + UUID getId(); + + /** + * Get the name of the object + * + * @return the name of the object + */ + String getName(); + + + /** + * Attempt to change the name of the object + * + * Request a change to the name of the object. The caller must pass in the name it believes the object currently + * has. If the current name differes from this expected value, then no name change will occur + * + * @param currentName the name the caller believes the object to have + * @param desiredName the name the caller would like the object to have + * @return the new name for the object + * @throws IllegalStateException if the name of the object may not be changed in in the current state + * @throws AccessControlException if the current context does not have permission to change the name + * @throws IllegalArgumentException if the provided name is not legal + * @throws NullPointerException if the desired name is null + */ + String setName(String currentName, String desiredName) throws IllegalStateException, + AccessControlException; + + + /** + * Get the desired state of the object. + * + * This is the state set at the object itself, however the object + * may not be able attain this state if one of its ancestors is in a different state (in particular a descendant + * object may not be ACTIVE if all of its ancestors are not also ACTIVE). + * + * @return the desired state of the object + */ + State getDesiredState(); + + /** + * Change the desired state of the object + * + * Request a change to the current state. The caller must pass in the state it believe the object to be in, if + * this differs from the current desired state when the object evalues the request, then no state change will occur. + * + * @param currentState the state the caller believes the object to be in + * @param desiredState the state the caller wishes the object to attain + * @return the new current state + * @throws IllegalStateTransitionException the requested state tranisition is invalid + * @throws AccessControlException the current context does not have sufficeint permissions to change the state + */ + State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException, + AccessControlException; + + /** + * Get the actual state of the object. + * + * This state is derived fromt the desired state of the object itself and + * the actual state of its parents. If an object "desires" to be ACTIVE, but one of its parents is STOPPED, then + * the actual state of the object will be STOPPED + * + * @return the actual state of the object + */ + State getActualState(); + + + /** + * Add a listener which will be informed of all changes to this configuration object + * + * @param listener the listener to add + */ + void addChangeListener(ConfigurationChangeListener listener); + + /** + * Remove a change listener + * + * + * @param listener the listener to remove + * @return true iff a listener was removed + */ + boolean removeChangeListener(ConfigurationChangeListener listener); + + /** + * Get the parent of the given type for this object + * + * @param clazz the class of parent being asked for + * @return the objects parent + */ + <T extends ConfiguredObject> T getParent(Class<T> clazz); + + + /** + * Returns whether the the object configuration is durably stored + * + * @return the durablity + */ + boolean isDurable(); + + /** + * Sets the durability of the object + * + * @param durable true iff the caller wishes the object to store its configuration durably + * + * @throws IllegalStateException if the durability cannot be changed in the current state + * @throws AccessControlException if the current context does not have sufficient permission to change the durability + * @throws IllegalArgumentException if the object does not support the requested durability + */ + void setDurable(boolean durable) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Return the lifetime policy for the object + * + * @return the lifetime policy + */ + LifetimePolicy getLifetimePolicy(); + + /** + * Set the lifetime policy of the object + * + * @param expected The lifetime policy the caller believes the object currently has + * @param desired The lifetime policy the caller desires the object to have + * @return the new lifetime policy + * @throws IllegalStateException if the lifetime policy cannot be changed in the current state + * @throws AccessControlException if the caller does not have permission to change the lifetime policy + * @throws IllegalArgumentException if the object does not support the requested lifetime policy + */ + LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Get the time the object will live once the lifetime policy conditions are no longer fulfilled + * + * @return the time to live + */ + long getTimeToLive(); + + /** + * Set the ttl value + * + * @param expected the ttl the caller believes the object currently has + * @param desired the ttl value the caller + * @return the new ttl value + * @throws IllegalStateException if the ttl cannot be set in the current state + * @throws AccessControlException if the caller does not have permission to change the ttl + * @throws IllegalArgumentException if the object does not support the requested ttl value + */ + long setTimeToLive(long expected, long desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + /** + * Get the names of attributes that are set on this object + * + * Not that the returned collection is correct at the time the method is called, but will not reflect future + * additions or removals when they occur + * + * @return the collection of attribute names + */ + Collection<String> getAttributeNames(); + + + /** + * Return the value for the given attribute + * + * @param name the name of the attribute + * @return the value of the attribute at the object (or null if the attribute is not set + */ + Object getAttribute(String name); + + /** + * Set the value of an attribute + * + * @param name the name of the attribute to be set + * @param expected the value the caller believes the attribute currently has (or null if it is expected to be unset) + * @param desired the desired value for the attribute (or null to unset the attribute) + * @return the new value for the given attribute + * @throws IllegalStateException if the attribute cannot be set while the object is in its current state + * @throws AccessControlException if the caller does not have permission to alter the value of the attribute + * @throws IllegalArgumentException if the provided value is not valid for the given argument + */ + Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException, + AccessControlException, + IllegalArgumentException; + + + /** + * Return the Statistics holder for the ConfiguredObject + * + * @return the Statistics holder for the ConfiguredObject (or null if none exists) + */ + Statistics getStatistics(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java new file mode 100644 index 0000000000..958177e713 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java @@ -0,0 +1,73 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; + +public interface Consumer extends ConfiguredObject +{ + public String DISTRIBUTION_MODE = "distributionMode"; + public String EXCLUSIVE = "exclusive"; + public String NO_LOCAL = "noLocal"; + public String SELECTOR = "selector"; + public String SETTLEMENT_MODE = "settlementMode"; + public String CREATED = "created"; + public String DURABLE = "durable"; + public String ID = "id"; + public String LIFETIME_POLICY = "lifetimePolicy"; + public String NAME = "name"; + public String STATE = "state"; + public String TIME_TO_LIVE = "timeToLive"; + public String UPDATED = "updated"; + + public Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableCollection( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + DISTRIBUTION_MODE, + SETTLEMENT_MODE, + EXCLUSIVE, + NO_LOCAL, + SELECTOR)); + + public String BYTES_OUT = "bytesOut"; + public String MESSAGES_OUT = "messagesOut"; + public String STATE_CHANGED = "stateChanged"; + public String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes"; + public String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages"; + + public Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableCollection( + Arrays.asList(BYTES_OUT, + MESSAGES_OUT, + STATE_CHANGED, + UNACKNOWLEDGED_BYTES, + UNACKNOWLEDGED_MESSAGES) + ); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java new file mode 100644 index 0000000000..e872273d05 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java @@ -0,0 +1,91 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import java.util.Map; + +public interface Exchange extends ConfiguredObject +{ + String BINDING_COUNT = "bindingCount"; + String BYTES_DROPPED = "bytesDropped"; + String BYTES_IN = "bytesIn"; + String MESSAGES_DROPPED = "messagesDropped"; + String MESSAGES_IN = "messagesIn"; + String PRODUCER_COUNT = "producerCount"; + String STATE_CHANGED = "stateChanged"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableList( + Arrays.asList(BINDING_COUNT, + BYTES_DROPPED, + BYTES_IN, + MESSAGES_DROPPED, + MESSAGES_IN, + PRODUCER_COUNT, + STATE_CHANGED)); + + String CREATED = "created"; + String DURABLE = "durable"; + String ID = "id"; + String LIFETIME_POLICY = "lifetimePolicy"; + String NAME = "name"; + String STATE = "state"; + String TIME_TO_LIVE = "timeToLive"; + String UPDATED = "updated"; + String ALTERNATE_EXCHANGE = "alternateExchange"; + String TYPE = "type"; + + // Attributes + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList( + ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + ALTERNATE_EXCHANGE, + TYPE + )); + + String getExchangeType(); + + //children + Collection<Binding> getBindings(); + Collection<Publisher> getPublishers(); + + //operations + Binding createBinding(String bindingKey, + Queue queue, + Map<String,Object> bindingArguments, + Map<String, Object> attributes); + + + // Statistics + + void delete(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java new file mode 100644 index 0000000000..9cab5e2103 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java @@ -0,0 +1,43 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public class IllegalStateTransitionException extends RuntimeException +{ + public IllegalStateTransitionException() + { + } + + public IllegalStateTransitionException(final String message) + { + super(message); + } + + public IllegalStateTransitionException(final String message, final Throwable cause) + { + super(message, cause); + } + + public IllegalStateTransitionException(final Throwable cause) + { + super(cause); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java new file mode 100644 index 0000000000..c9006f4e71 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum LifetimePolicy +{ + PERMANENT, + AUTO_DELETE +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java new file mode 100644 index 0000000000..cdb85d8023 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java @@ -0,0 +1,25 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public interface Publisher extends ConfiguredObject +{ +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java new file mode 100644 index 0000000000..7c4f0de22b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java @@ -0,0 +1,146 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.Arrays; +import java.util.Collection; +import java.util.Collections; +import org.apache.qpid.server.queue.QueueEntryVisitor; + + +public interface Queue extends ConfiguredObject +{ + public static final String BINDING_COUNT = "bindingCount"; + public static final String CONSUMER_COUNT = "consumerCount"; + public static final String CONSUMER_COUNT_WITH_CREDIT = "consumerCountWithCredit"; + public static final String DISCARDS_TTL_BYTES = "discardsTtlBytes"; + public static final String DISCARDS_TTL_MESSAGES = "discardsTtlMessages"; + public static final String PERSISTENT_DEQUEUED_BYTES = "persistentDequeuedBytes"; + public static final String PERSISTENT_DEQUEUED_MESSAGES = "persistentDequeuedMessages"; + public static final String PERSISTENT_ENQUEUED_BYTES = "persistentEnqueuedBytes"; + public static final String PERSISTENT_ENQUEUED_MESSAGES = "persistentEnqueuedMessages"; + public static final String QUEUE_DEPTH_BYTES = "queueDepthBytes"; + public static final String QUEUE_DEPTH_MESSAGES = "queueDepthMessages"; + public static final String STATE_CHANGED = "stateChanged"; + public static final String TOTAL_DEQUEUED_BYTES = "totalDequeuedBytes"; + public static final String TOTAL_DEQUEUED_MESSAGES = "totalDequeuedMessages"; + public static final String TOTAL_ENQUEUED_BYTES = "totalEnqueuedBytes"; + public static final String TOTAL_ENQUEUED_MESSAGES = "totalEnqueuedMessages"; + public static final String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes"; + public static final String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages"; + + public static final Collection<String> AVAILABLE_STATISTICS = + Collections.unmodifiableList( + Arrays.asList(BINDING_COUNT, + CONSUMER_COUNT, + CONSUMER_COUNT_WITH_CREDIT, + DISCARDS_TTL_BYTES, + DISCARDS_TTL_MESSAGES, + PERSISTENT_DEQUEUED_BYTES, + PERSISTENT_DEQUEUED_MESSAGES, + PERSISTENT_ENQUEUED_BYTES, + PERSISTENT_ENQUEUED_MESSAGES, + QUEUE_DEPTH_BYTES, + QUEUE_DEPTH_MESSAGES, + STATE_CHANGED, + TOTAL_DEQUEUED_BYTES, + TOTAL_DEQUEUED_MESSAGES, + TOTAL_ENQUEUED_BYTES, + TOTAL_ENQUEUED_MESSAGES, + UNACKNOWLEDGED_BYTES, + UNACKNOWLEDGED_MESSAGES)); + + + + public static final String ID = "id"; + public static final String NAME = "name"; + public static final String STATE = "state"; + public static final String DURABLE = "durable"; + public static final String LIFETIME_POLICY = "lifetimePolicy"; + public static final String TIME_TO_LIVE = "timeToLive"; + public static final String CREATED = "created"; + public static final String UPDATED = "updated"; + + public static final String ALERT_REPEAT_GAP = "alertRepeatGap"; + public static final String ALERT_THRESHOLD_MESSAGE_AGE = "alertThresholdMessageAge"; + public static final String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize"; + public static final String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes"; + public static final String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages"; + public static final String ALTERNATE_EXCHANGE = "alternateExchange"; + public static final String EXCLUSIVE = "exclusive"; + public static final String MESSAGE_GROUP_KEY = "messageGroupKey"; + public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup"; + public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups"; + public static final String LVQ_KEY = "lvqKey"; + public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts"; + public static final String NO_LOCAL = "noLocal"; + public static final String OWNER = "owner"; + public static final String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes"; + public static final String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes"; + public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped"; + public static final String SORT_KEY = "sortKey"; + public static final String TYPE = "type"; + + + + + public static final Collection<String> AVAILABLE_ATTRIBUTES = + Collections.unmodifiableList( + Arrays.asList(ID, + NAME, + STATE, + DURABLE, + LIFETIME_POLICY, + TIME_TO_LIVE, + CREATED, + UPDATED, + TYPE, + ALTERNATE_EXCHANGE, + EXCLUSIVE, + OWNER, + NO_LOCAL, + LVQ_KEY, + SORT_KEY, + MESSAGE_GROUP_KEY, + MESSAGE_GROUP_DEFAULT_GROUP, + MESSAGE_GROUP_SHARED_GROUPS, + MAXIMUM_DELIVERY_ATTEMPTS, + QUEUE_FLOW_CONTROL_SIZE_BYTES, + QUEUE_FLOW_RESUME_SIZE_BYTES, + QUEUE_FLOW_STOPPED, + ALERT_THRESHOLD_MESSAGE_AGE, + ALERT_THRESHOLD_MESSAGE_SIZE, + ALERT_THRESHOLD_QUEUE_DEPTH_BYTES, + ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES, + ALERT_REPEAT_GAP + )); + + //children + Collection<Binding> getBindings(); + Collection<Consumer> getConsumers(); + + + //operations + + void visit(QueueEntryVisitor visitor); + + void delete(); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java new file mode 100644 index 0000000000..a73b2c9d3e --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +public enum State +{ + INITIALISING, + QUIESCED, + STOPPED, + ACTIVE, + DELETED +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java new file mode 100644 index 0000000000..2cb81eae82 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java @@ -0,0 +1,25 @@ +package org.apache.qpid.server.model; + +import java.util.Collection; + +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p/> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p/> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +public interface Statistics +{ + Collection<String> getStatisticNames(); + public Object getStatistic(String name); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java new file mode 100644 index 0000000000..d8493c6df4 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java @@ -0,0 +1,54 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.model; + +import java.util.UUID; + +import org.apache.qpid.exchange.ExchangeDefaults; + + +public class UUIDGenerator +{ + + public static UUID generateUUID() + { + return UUID.randomUUID(); + } + + public static UUID generateUUID(String objectName, String virtualHostName) + { + StringBuilder sb = new StringBuilder(); + sb.append(virtualHostName).append(objectName); + return UUID.nameUUIDFromBytes(sb.toString().getBytes()); + } + + public static UUID generateExchangeUUID(String echangeName, String virtualHostName) + { + if(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(echangeName) || echangeName.startsWith("amq.") || echangeName.startsWith("qpid.")) + { + return generateUUID(echangeName, virtualHostName); + } + else + { + return generateUUID(); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 263ed6d8cc..8d227d9677 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -62,6 +62,7 @@ import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.filter.JMSSelectorFilter; import org.apache.qpid.server.filter.SimpleFilterManager; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueEntry; @@ -199,6 +200,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS if(queue == null) { queue = AMQQueueFactory.createAMQQueueImpl( + UUIDGenerator.generateUUID(), name, isDurable, null, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 3f7eb18989..ef298b4731 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -37,6 +37,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.registry.IApplicationRegistry; @@ -314,7 +315,8 @@ public class Session_1_0 implements SessionEventListener ? null : (LifetimePolicy) properties.get(LIFETIME_POLICY); - final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName, + final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateUUID(), + queueName, false, // durable null, // owner false, // autodelete diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java index f6bf6626a0..46c2a635b7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java @@ -23,19 +23,20 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public class AMQPriorityQueue extends OutOfOrderQueue { - protected AMQPriorityQueue(final String name, + protected AMQPriorityQueue(UUID id, + final String name, final boolean durable, final String owner, final boolean autoDelete, boolean exclusive, final VirtualHost virtualHost, - Map<String, Object> arguments, - int priorities) + Map<String, Object> arguments, int priorities) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments); } public int getPriorities() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java index 8ff3f0148b..f2b7d7c56b 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java @@ -20,6 +20,10 @@ */ package org.apache.qpid.server.queue; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + import org.apache.qpid.AMQException; import org.apache.qpid.AMQSecurityException; import org.apache.qpid.exchange.ExchangeDefaults; @@ -30,12 +34,10 @@ import org.apache.qpid.server.configuration.ServerConfiguration; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeFactory; import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.HashMap; -import java.util.Map; - public class AMQQueueFactory { public static final String X_QPID_PRIORITIES = "x-qpid-priorities"; @@ -166,8 +168,13 @@ public class AMQQueueFactory } }; - - /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */ + /** + * Creates a new queue with a random id. + * + * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map) + * @deprecated because only called from unit tests + * */ + @Deprecated public static AMQQueue createAMQQueueImpl(AMQShortString name, boolean durable, AMQShortString owner, @@ -175,22 +182,28 @@ public class AMQQueueFactory boolean exclusive, VirtualHost virtualHost, final FieldTable arguments) throws AMQException { - return createAMQQueueImpl(name == null ? null : name.toString(), + return createAMQQueueImpl(UUIDGenerator.generateUUID(), + name == null ? null : name.toString(), durable, owner == null ? null : owner.toString(), autoDelete, - exclusive, - virtualHost, FieldTable.convertToMap(arguments)); + exclusive, virtualHost, FieldTable.convertToMap(arguments)); } - - public static AMQQueue createAMQQueueImpl(String queueName, + /** + * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value. + */ + public static AMQQueue createAMQQueueImpl(UUID id, + String queueName, boolean durable, String owner, boolean autoDelete, - boolean exclusive, - VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException + boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException { + if (id == null) + { + throw new IllegalArgumentException("Queue id must not be null"); + } if (queueName == null) { throw new IllegalArgumentException("Queue name must not be null"); @@ -241,19 +254,19 @@ public class AMQQueueFactory AMQQueue q; if(sortingKey != null) { - q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); + q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey); } else if(conflationKey != null) { - q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); + q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey); } else if(priorities > 1) { - q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); + q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities); } else { - q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); + q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments); } //Register the new queue @@ -287,7 +300,7 @@ public class AMQQueueFactory if(dlExchange == null) { - dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); + dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0); exchangeRegistry.registerExchange(dlExchange); @@ -309,7 +322,7 @@ public class AMQQueueFactory args.put(X_QPID_DLQ_ENABLED, false); args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0); - dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args); + dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args); //enter the dlq in the persistent store virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args)); @@ -364,7 +377,10 @@ public class AMQQueueFactory arguments.put(X_QPID_DLQ_ENABLED, true); } - AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments); + // we need queues that are defined in config to have deterministic ids. + UUID id = UUIDGenerator.generateUUID(queueName, host.getName()); + + AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments); q.configure(config); return q; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java index 2c645cc555..c2813bb7a5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java @@ -21,22 +21,23 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.virtualhost.VirtualHost; - import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.server.virtualhost.VirtualHost; public class ConflationQueue extends SimpleAMQQueue { - protected ConflationQueue(String name, + protected ConflationQueue(UUID id, + String name, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - Map<String, Object> args, - String conflationKey) + Map<String, Object> args, String conflationKey) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args); } public String getConflationKey() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java index 801fe55939..2493974d45 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java @@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentMap; @@ -95,4 +96,18 @@ public class DefaultQueueRegistry implements QueueRegistry } _queueMap.clear(); } + + @Override + public synchronized AMQQueue getQueue(UUID queueId) + { + Collection<AMQQueue> queues = _queueMap.values(); + for (AMQQueue queue : queues) + { + if (queue.getId().equals(queueId)) + { + return queue; + } + } + return null; + } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 53121fc031..89976ca16e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -5,15 +5,16 @@ import org.apache.qpid.server.subscription.SubscriptionList; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public abstract class OutOfOrderQueue extends SimpleAMQQueue { - protected OutOfOrderQueue(String name, boolean durable, String owner, - boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + protected OutOfOrderQueue(UUID id, String name, boolean durable, + String owner, boolean autoDelete, boolean exclusive, + VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); + super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments); } @Override diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java new file mode 100644 index 0000000000..1578d21321 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java @@ -0,0 +1,22 @@ +package org.apache.qpid.server.queue; + +/** +* Licensed to the Apache Software Foundation (ASF) under one or more +* contributor license agreements. See the NOTICE file distributed with +* this work for additional information regarding copyright ownership. +* The ASF licenses this file to You under the Apache License, Version 2.0 +* (the "License"); you may not use this file except in compliance with +* the License. You may obtain a copy of the License at +* <p/> +* http://www.apache.org/licenses/LICENSE-2.0 +* <p/> +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ +public interface QueueEntryVisitor +{ + boolean visit(QueueEntry entry); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java index 1ffc0a3560..72a54c9889 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java @@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Collection; +import java.util.UUID; public interface QueueRegistry { @@ -42,4 +43,6 @@ public interface QueueRegistry AMQQueue getQueue(String queue); void stopAllAndUnregisterMBeans(); + + AMQQueue getQueue(UUID queueId); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index e6f059a875..d7eb304c92 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -191,29 +191,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount(); private final MessageGroupManager _messageGroupManager; - protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) + protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments) { - this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments); + this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) + public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) { - this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); + this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments); } - public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) + public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments) { - this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); + this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments); } - protected SimpleAMQQueue(AMQShortString name, + protected SimpleAMQQueue(UUID id, + AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, - QueueEntryListFactory entryListFactory, - Map<String,Object> arguments) + QueueEntryListFactory entryListFactory, Map<String,Object> arguments) { if (name == null) @@ -236,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes _entries = entryListFactory.createQueueEntryList(this); _arguments = arguments; - _id = virtualHost.getConfigStore().createId(); + _id = id; _asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java index 446f57b142..b3566df0c4 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java @@ -24,6 +24,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; +import java.util.UUID; public class SortedQueue extends OutOfOrderQueue { @@ -33,12 +34,12 @@ public class SortedQueue extends OutOfOrderQueue private final Object _sortedQueueLock = new Object(); private final String _sortedPropertyName; - protected SortedQueue(final String name, final boolean durable, - final String owner, final boolean autoDelete, final boolean exclusive, - final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) + protected SortedQueue(UUID id, final String name, + final boolean durable, final String owner, final boolean autoDelete, + final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName) { - super(name, durable, owner, autoDelete, exclusive, virtualHost, - new SortedQueueEntryListFactory(sortedPropertyName), arguments); + super(id, name, durable, owner, autoDelete, exclusive, + virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments); this._sortedPropertyName = sortedPropertyName; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java index 73f127e097..1307b1dbd4 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java @@ -32,19 +32,19 @@ public interface ConfigurationRecoveryHandler public static interface QueueRecoveryHandler { - void queue(String queueName, String owner, boolean exclusive, FieldTable arguments); + void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments); ExchangeRecoveryHandler completeQueueRecovery(); } public static interface ExchangeRecoveryHandler { - void exchange(String exchangeName, String type, boolean autoDelete); + void exchange(UUID id, String exchangeName, String type, boolean autoDelete); BindingRecoveryHandler completeExchangeRecovery(); } public static interface BindingRecoveryHandler { - void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf); + void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf); BrokerLinkRecoveryHandler completeBindingRecovery(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java new file mode 100644 index 0000000000..1a67fdf540 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java @@ -0,0 +1,183 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.model.Binding; +import org.apache.qpid.server.model.Exchange; +import org.apache.qpid.server.model.LifetimePolicy; +import org.apache.qpid.server.model.Queue; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.util.MapJsonSerializer; + +public class ConfiguredObjectHelper +{ + /** + * Name of queue attribute to store queue creation arguments. + * <p> + * This attribute is not defined yet on Queue configured object interface. + */ + private static final String QUEUE_ARGUMENTS = "ARGUMENTS"; + + private MapJsonSerializer _serializer = new MapJsonSerializer(); + + public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh) + { + if (Queue.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + String queueName = (String) attributeMap.get(Queue.NAME); + String owner = (String) attributeMap.get(Queue.OWNER); + boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE); + @SuppressWarnings("unchecked") + Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(QUEUE_ARGUMENTS); + FieldTable arguments = null; + if (queueArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(queueArgumentsMap); + } + qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments); + } + } + + public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord) + { + Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes()); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + String newJson = _serializer.serialize(attributesMap); + ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson); + return newQueueRecord; + } + + public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Queue.NAME, queue.getName()); + attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner())); + attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive()); + if (arguments != null) + { + attributesMap.put(QUEUE_ARGUMENTS, FieldTable.convertToMap(arguments)); + } + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json); + return configuredObject; + } + + public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh) + { + if (Exchange.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + String exchangeName = (String) attributeMap.get(Exchange.NAME); + String exchangeType = (String) attributeMap.get(Exchange.TYPE); + String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY); + boolean autoDelete = lifeTimePolicy == null + || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE; + erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete); + } + } + + public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Exchange.NAME, exchange.getName()); + attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString())); + attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name() + : LifetimePolicy.PERMANENT.name()); + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json); + return configuredObject; + } + + public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh) + { + if (Binding.class.getName().equals(configuredObject.getType())) + { + Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes()); + UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE)); + UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE)); + String bindingName = (String) attributeMap.get(Binding.NAME); + + @SuppressWarnings("unchecked") + Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS); + FieldTable arguments = null; + if (bindingArgumentsMap != null) + { + arguments = FieldTable.convertToFieldTable(bindingArgumentsMap); + } + ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes())); + + brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB); + } + } + + public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding) + { + Map<String, Object> attributesMap = new HashMap<String, Object>(); + attributesMap.put(Binding.NAME, binding.getBindingKey()); + attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId()); + attributesMap.put(Binding.QUEUE, binding.getQueue().getId()); + Map<String, Object> arguments = binding.getArguments(); + if (arguments != null) + { + attributesMap.put(Binding.ARGUMENTS, arguments); + } + String json = _serializer.serialize(attributesMap); + ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json); + return configuredObject; + } + + public void recoverQueues(QueueRecoveryHandler qrh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadQueue(configuredObjectRecord, qrh); + } + } + + public void recoverExchanges(ExchangeRecoveryHandler erh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadExchange(configuredObjectRecord, erh); + } + } + + public void recoverBindings(BindingRecoveryHandler brh, List<ConfiguredObjectRecord> configuredObjects) + { + for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects) + { + loadQueueBinding(configuredObjectRecord, brh); + } + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java new file mode 100644 index 0000000000..95e1713d78 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.UUID; + +public class ConfiguredObjectRecord +{ + private UUID _id; + private String _type; + private String _attributes; + + public ConfiguredObjectRecord(UUID id, String type, String attributes) + { + super(); + _id = id; + _type = type; + _attributes = attributes; + } + + public UUID getId() + { + return _id; + } + + public void setId(UUID id) + { + _id = id; + } + + public String getType() + { + return _type; + } + + public String getAttributes() + { + return _attributes; + } + + @Override + public String toString() + { + return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]"; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java index 13c24e624e..655887e5c2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -69,28 +69,22 @@ public interface DurableConfigurationStore void removeExchange(Exchange exchange) throws AMQStoreException; /** - * Binds the specified queue to an exchange with a routing key. + * Store the queue binding. * - * @param exchange The exchange to bind to. - * @param routingKey The routing key to bind by. - * @param queue The queue to bind. - * @param args Additional parameters. + * @param binding queue binding * * @throws AMQStoreException if the operation fails for any reason. */ - void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + void bindQueue(Binding binding) throws AMQStoreException; /** - * Unbinds the specified from an exchange under a particular routing key. + * Removes queue binding * - * @param exchange The exchange to unbind from. - * @param routingKey The routing key to unbind. - * @param queue The queue to unbind. - * @param args Additional parameters. + * @param binding queue binding to remove * * @throws AMQStoreException If the operation fails for any reason. */ - void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException; + void unbindQueue(Binding binding) throws AMQStoreException; /** * Makes the specified queue persistent. diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java index 77df6c5abf..34c7d2d933 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.store; import org.apache.commons.configuration.Configuration; import org.apache.qpid.AMQStoreException; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; @@ -48,12 +48,12 @@ public class NullMessageStore implements MessageStore } @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void bindQueue(Binding binding) throws AMQStoreException { } @Override - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException + public void unbindQueue(Binding binding) throws AMQStoreException { } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java index b92d5f3e9b..bd4da648f9 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java @@ -20,15 +20,17 @@ */ package org.apache.qpid.server.store; +import java.util.UUID; + public interface TransactionLogRecoveryHandler { QueueEntryRecoveryHandler begin(MessageStore log); public static interface QueueEntryRecoveryHandler { - void queueEntry(String queuename, long messageId); - DtxRecordRecoveryHandler completeQueueEntryRecovery(); + + void queueEntry(UUID queueId, long messageId); } public static interface DtxRecordRecoveryHandler diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java index 0d81dd151d..576dca847d 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java @@ -20,7 +20,9 @@ */ package org.apache.qpid.server.store; +import java.util.UUID; + public interface TransactionLogResource { - public String getResourceName(); + public UUID getId(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java index 07d8bb97f8..0371cdcfcb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java @@ -29,6 +29,7 @@ import java.io.File; import java.io.IOException; import java.lang.ref.SoftReference; import java.nio.ByteBuffer; +import java.nio.charset.Charset; import java.sql.Blob; import java.sql.Connection; import java.sql.Driver; @@ -51,12 +52,15 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.federation.Bridge; import org.apache.qpid.server.federation.BrokerLink; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.ConfiguredObjectHelper; +import org.apache.qpid.server.store.ConfiguredObjectRecord; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.EventManager; @@ -89,12 +93,9 @@ public class DerbyMessageStore implements MessageStore private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; - private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE"; - private static final String QUEUE_TABLE_NAME = "QPID_QUEUE"; - private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS"; - private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY"; + private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES"; - private static final String META_DATA_TABLE_NAME = "QPID_META_DATA"; + private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA"; private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT"; private static final String LINKS_TABLE_NAME = "QPID_LINKS"; @@ -103,7 +104,9 @@ public class DerbyMessageStore implements MessageStore private static final String XID_TABLE_NAME = "QPID_XIDS"; private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS"; - private static final int DB_VERSION = 3; + private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS"; + + private static final int DB_VERSION = 6; @@ -119,38 +122,23 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )"; private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )"; - private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )"; - private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))"; - private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )"; - private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME; - private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; - private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?"; - private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME; - private static final String SELECT_FROM_BINDINGS = - "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name"; - private static final String FIND_BINDING = - "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? "; - private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )"; - private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; - private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?"; - private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )"; - private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?"; - private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)"; - private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?"; - - private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )"; - private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)"; - private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?"; - private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id"; - - - private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; - private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )"; - - private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)"; - private static final String SELECT_FROM_MESSAGE_CONTENT = - "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset"; - private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?"; + private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )"; + private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)"; + private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?"; + private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id"; + + + private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME + + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )"; + private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME + + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )"; + + private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + + "( message_id, content ) values (?, ?)"; + private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME + + " WHERE message_id = ?"; + private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + + " WHERE message_id = ?"; private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";; private static final String SELECT_FROM_META_DATA = @@ -214,18 +202,32 @@ public class DerbyMessageStore implements MessageStore private static final String CREATE_XID_ACTIONS_TABLE = "CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null," + " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " + - "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" + + "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" + ", PRIMARY KEY ( " + - "format, global_id, branch_id, action_type, queue_name, message_id))"; + "format, global_id, branch_id, action_type, queue_id, message_id))"; private static final String INSERT_INTO_XID_ACTIONS = "INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " + - "queue_name, message_id ) values (?,?,?,?,?,?) "; + "queue_id, message_id ) values (?,?,?,?,?,?) "; private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; private static final String SELECT_ALL_FROM_XID_ACTIONS = - "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME + + "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME + " WHERE format = ? and global_id = ? and branch_id = ?"; + private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))"; + private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME + + " ( id, object_type, attributes) VALUES (?,?,?)"; + private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME + + " set object_type =?, attributes = ? where id = ?"; + private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME + + " where id = ?"; + private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME; + + private final Charset UTF8_CHARSET = Charset.forName("UTF-8"); + private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006"; private final StateManager _stateManager; @@ -244,6 +246,8 @@ public class DerbyMessageStore implements MessageStore _stateManager = new StateManager(_eventManager); } + private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper(); + @Override public void configureConfigStore(String name, ConfigurationRecoveryHandler configRecoveryHandler, @@ -323,9 +327,7 @@ public class DerbyMessageStore implements MessageStore Connection conn = newAutoCommitConnection(); createVersionTable(conn); - createExchangeTable(conn); - createQueueTable(conn); - createBindingsTable(conn); + createConfiguredObjectsTable(conn); createQueueEntryTable(conn); createMetaDataTable(conn); createMessageContentTable(conn); @@ -366,15 +368,14 @@ public class DerbyMessageStore implements MessageStore } - - private void createExchangeTable(final Connection conn) throws SQLException + private void createConfiguredObjectsTable(final Connection conn) throws SQLException { - if(!tableExists(EXCHANGE_TABLE_NAME, conn)) + if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn)) { Statement stmt = conn.createStatement(); try { - stmt.execute(CREATE_EXCHANGE_TABLE); + stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE); } finally { @@ -383,39 +384,6 @@ public class DerbyMessageStore implements MessageStore } } - private void createQueueTable(final Connection conn) throws SQLException - { - if(!tableExists(QUEUE_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(CREATE_QUEUE_TABLE); - } - finally - { - stmt.close(); - } - } - } - - private void createBindingsTable(final Connection conn) throws SQLException - { - if(!tableExists(BINDINGS_TABLE_NAME, conn)) - { - Statement stmt = conn.createStatement(); - try - { - stmt.execute(CREATE_BINDINGS_TABLE); - } - finally - { - stmt.close(); - } - } - - } - private void createQueueEntryTable(final Connection conn) throws SQLException { if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn)) @@ -433,7 +401,7 @@ public class DerbyMessageStore implements MessageStore } - private void createMetaDataTable(final Connection conn) throws SQLException + private void createMetaDataTable(final Connection conn) throws SQLException { if(!tableExists(META_DATA_TABLE_NAME, conn)) { @@ -558,26 +526,25 @@ public class DerbyMessageStore implements MessageStore private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException { - try { + List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects(); ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this); - recoverQueues(qrh); + _configuredObjectHelper.recoverQueues(qrh, configuredObjects); ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery(); - List<String> exchanges = loadExchanges(erh); + _configuredObjectHelper.recoverExchanges(erh, configuredObjects); + ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery(); - recoverBindings(brh, exchanges); + _configuredObjectHelper.recoverBindings(brh, configuredObjects); + ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery(); recoverBrokerLinks(lrh); } catch (SQLException e) { - throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e); } - - } private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh) @@ -718,176 +685,6 @@ public class DerbyMessageStore implements MessageStore } - private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException - { - Connection conn = newAutoCommitConnection(); - try - { - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE); - try - { - - while(rs.next()) - { - String queueName = rs.getString(1); - _logger.debug("Got queue " + queueName); - String owner = rs.getString(2); - boolean exclusive = rs.getBoolean(3); - Blob argumentsAsBlob = rs.getBlob(4); - - byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length()); - FieldTable arguments; - if(dataAsBytes.length > 0) - { - - try - { - arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length); - } - catch (IOException e) - { - throw new RuntimeException("IO Exception should not be thrown",e); - } - } - else - { - arguments = null; - } - - qrh.queue(queueName, owner, exclusive, arguments); - - } - - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - - - private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException - { - - List<String> exchanges = new ArrayList<String>(); - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - Statement stmt = conn.createStatement(); - try - { - ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE); - try - { - while(rs.next()) - { - String exchangeName = rs.getString(1); - String type = rs.getString(2); - boolean autoDelete = rs.getShort(3) != 0; - - exchanges.add(exchangeName); - - erh.exchange(exchangeName, type, autoDelete); - - } - return exchanges; - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - if(conn != null) - { - conn.close(); - } - } - - } - - private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException - { - _logger.info("Recovering bindings..."); - - Connection conn = null; - try - { - conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS); - - try - { - ResultSet rs = stmt.executeQuery(); - - try - { - - while(rs.next()) - { - String exchangeName = rs.getString(1); - String queueName = rs.getString(2); - String bindingKey = rs.getString(3); - Blob arguments = rs.getBlob(4); - java.nio.ByteBuffer buf; - - if(arguments != null && arguments.length() != 0) - { - byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length()); - buf = java.nio.ByteBuffer.wrap(argumentBytes); - } - else - { - buf = null; - } - - brh.binding(exchangeName, queueName, bindingKey, buf); - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - if(conn != null) - { - conn.close(); - } - } - } - - - @Override public void close() throws Exception { @@ -999,60 +796,8 @@ public class DerbyMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - try - { - - - PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE); - try - { - stmt.setString(1, exchange.getNameShortString().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this exchange - if (!rs.next()) - { - - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE); - try - { - insertStmt.setString(1, exchange.getName().toString()); - insertStmt.setString(2, exchange.getTypeShortString().asString()); - insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0); - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange); + insertConfiguredObject(configuredObject); } } @@ -1060,150 +805,32 @@ public class DerbyMessageStore implements MessageStore @Override public void removeExchange(Exchange exchange) throws AMQStoreException { - - try + int results = removeConfiguredObject(exchange.getId()); + if (results == 0) { - Connection conn = newAutoCommitConnection(); - try - { - PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE); - try - { - stmt.setString(1, exchange.getNameShortString().toString()); - int results = stmt.executeUpdate(); - stmt.close(); - if(results == 0) - { - throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found"); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e); + throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found"); } } @Override - public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void bindQueue(Binding binding) throws AMQStoreException { if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - try - { - - PreparedStatement stmt = conn.prepareStatement(FIND_BINDING); - try - { - stmt.setString(1, exchange.getNameShortString().toString() ); - stmt.setString(2, queue.getNameShortString().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - ResultSet rs = stmt.executeQuery(); - try - { - // If this binding is not already in the store then create it. - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS); - try - { - insertStmt.setString(1, exchange.getNameShortString().toString() ); - insertStmt.setString(2, queue.getNameShortString().toString()); - insertStmt.setString(3, routingKey == null ? null : routingKey.toString()); - if(args != null) - { - // TODO - In Java 6 we could use create/set Blob - byte[] bytes = args.getDataAsBytes(); - ByteArrayInputStream bis = new ByteArrayInputStream(bytes); - insertStmt.setBinaryStream(4, bis, bytes.length); - } - else - { - insertStmt.setNull(4, Types.BLOB); - } - - insertStmt.executeUpdate(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " to database: " + e.getMessage(), e); - } - + ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding); + insertConfiguredObject(configuredObject); } - - } @Override - public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) + public void unbindQueue(Binding binding) throws AMQStoreException { - Connection conn = null; - PreparedStatement stmt = null; - - try - { - conn = newAutoCommitConnection(); - // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob - stmt = conn.prepareStatement(DELETE_FROM_BINDINGS); - stmt.setString(1, exchange.getNameShortString().toString() ); - stmt.setString(2, queue.getNameShortString().toString()); - stmt.setString(3, routingKey == null ? null : routingKey.toString()); - - int result = stmt.executeUpdate(); - - if(result != 1) - { - throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange " - + exchange.getNameShortString() + " in database: " + e.getMessage(), e); - } - finally + int results = removeConfiguredObject(binding.getId()); + if (results == 0) { - closePreparedStatement(stmt); - closeConnection(conn); + throw new AMQStoreException("Binding " + binding + " not found"); } } @@ -1220,68 +847,8 @@ public class DerbyMessageStore implements MessageStore if (_stateManager.isInState(State.ACTIVE)) { - try - { - Connection conn = newAutoCommitConnection(); - - PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); - try - { - stmt.setString(1, queue.getNameShortString().toString()); - ResultSet rs = stmt.executeQuery(); - try - { - - // If we don't have any data in the result set then we can add this queue - if (!rs.next()) - { - PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE); - - try - { - String owner = queue.getOwner() == null ? null : queue.getOwner().toString(); - - insertStmt.setString(1, queue.getNameShortString().toString()); - insertStmt.setString(2, owner); - insertStmt.setBoolean(3,queue.isExclusive()); - - final byte[] underlying; - if(arguments != null) - { - underlying = arguments.getDataAsBytes(); - } - else - { - underlying = new byte[0]; - } - - ByteArrayInputStream bis = new ByteArrayInputStream(underlying); - insertStmt.setBinaryStream(4,bis,underlying.length); - - insertStmt.execute(); - } - finally - { - insertStmt.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - conn.close(); - - } - catch (SQLException e) - { - throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); - } + ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments); + insertConfiguredObject(queueConfiguredObject); } } @@ -1299,54 +866,11 @@ public class DerbyMessageStore implements MessageStore { if (_stateManager.isInState(State.ACTIVE)) { - try + ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId()); + if (queueConfiguredObject != null) { - Connection conn = newAutoCommitConnection(); - - try - { - PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE); - try - { - stmt.setString(1, queue.getNameShortString().toString()); - - ResultSet rs = stmt.executeQuery(); - try - { - if (rs.next()) - { - PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY); - try - { - stmt2.setBoolean(1,queue.isExclusive()); - stmt2.setString(2, queue.getNameShortString().toString()); - - stmt2.execute(); - } - finally - { - stmt2.close(); - } - } - } - finally - { - rs.close(); - } - } - finally - { - stmt.close(); - } - } - finally - { - conn.close(); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e); + ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject); + updateConfiguredObject(newQueueRecord); } } @@ -1410,31 +934,11 @@ public class DerbyMessageStore implements MessageStore { AMQShortString name = queue.getNameShortString(); _logger.debug("public void removeQueue(AMQShortString name = " + name + "): called"); - Connection conn = null; - PreparedStatement stmt = null; - try + int results = removeConfiguredObject(queue.getId()); + if (results == 0) { - conn = newAutoCommitConnection(); - stmt = conn.prepareStatement(DELETE_FROM_QUEUE); - stmt.setString(1, name.toString()); - int results = stmt.executeUpdate(); - - if (results == 0) - { - throw new AMQStoreException("Queue " + name + " not found"); - } - } - catch (SQLException e) - { - throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e); - } - finally - { - closePreparedStatement(stmt); - closeConnection(conn); + throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found"); } - - } @Override @@ -1676,8 +1180,6 @@ public class DerbyMessageStore implements MessageStore public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { - String name = queue.getResourceName(); - Connection conn = connWrapper.getConnection(); @@ -1685,13 +1187,13 @@ public class DerbyMessageStore implements MessageStore { if (_logger.isDebugEnabled()) { - _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]"); + _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]"); } PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY); try { - stmt.setString(1,name); + stmt.setString(1, queue.getId().toString()); stmt.setLong(2,messageId); stmt.executeUpdate(); } @@ -1703,7 +1205,7 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { _logger.error("Failed to enqueue: " + e.getMessage(), e); - throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name + throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId() + " to database", e); } @@ -1711,8 +1213,6 @@ public class DerbyMessageStore implements MessageStore public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException { - String name = queue.getResourceName(); - Connection conn = connWrapper.getConnection(); @@ -1722,7 +1222,7 @@ public class DerbyMessageStore implements MessageStore PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY); try { - stmt.setString(1,name); + stmt.setString(1, queue.getId().toString()); stmt.setLong(2,messageId); int results = stmt.executeUpdate(); @@ -1730,12 +1230,14 @@ public class DerbyMessageStore implements MessageStore if(results != 1) { - throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name); + throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId()); } if (_logger.isDebugEnabled()) { - _logger.debug("Dequeuing message " + messageId + " on queue " + name ); + _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId()); } } finally @@ -1746,8 +1248,8 @@ public class DerbyMessageStore implements MessageStore catch (SQLException e) { _logger.error("Failed to dequeue: " + e.getMessage(), e); - throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name - + " from database", e); + throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + + " with id " + queue.getId() + " from database", e); } } @@ -1840,7 +1342,7 @@ public class DerbyMessageStore implements MessageStore stmt.setString(4, "E"); for(Transaction.Record record : enqueues) { - stmt.setString(5, record.getQueue().getResourceName()); + stmt.setString(5, record.getQueue().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -1851,7 +1353,7 @@ public class DerbyMessageStore implements MessageStore stmt.setString(4, "D"); for(Transaction.Record record : dequeues) { - stmt.setString(5, record.getQueue().getResourceName()); + stmt.setString(5, record.getQueue().getId().toString()); stmt.setLong(6, record.getMessage().getMessageNumber()); stmt.executeUpdate(); } @@ -2081,9 +1583,9 @@ public class DerbyMessageStore implements MessageStore while(rs.next()) { - String queueName = rs.getString(1); + String id = rs.getString(1); long messageId = rs.getLong(2); - queueEntryHandler.queueEntry(queueName,messageId); + queueEntryHandler.queueEntry(UUID.fromString(id), messageId); } } finally @@ -2137,13 +1639,13 @@ public class DerbyMessageStore implements MessageStore private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage { - private final String _queueName; private long _messageNumber; + private UUID _queueId; - public RecordImpl(String queueName, long messageNumber) + public RecordImpl(UUID queueId, long messageNumber) { - _queueName = queueName; _messageNumber = messageNumber; + _queueId = queueId; } @Override @@ -2177,9 +1679,9 @@ public class DerbyMessageStore implements MessageStore } @Override - public String getResourceName() + public UUID getId() { - return _queueName; + return _queueId; } } @@ -2237,10 +1739,10 @@ public class DerbyMessageStore implements MessageStore { String actionType = rs.getString(1); - String queueName = rs.getString(2); + UUID queueId = UUID.fromString(rs.getString(2)); long messageId = rs.getLong(3); - RecordImpl record = new RecordImpl(queueName, messageId); + RecordImpl record = new RecordImpl(queueId, messageId); List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues; records.add(record); } @@ -2319,11 +1821,11 @@ public class DerbyMessageStore implements MessageStore } - private void addContent(Connection conn, long messageId, int offset, ByteBuffer src) + private void addContent(Connection conn, long messageId, ByteBuffer src) { if(_logger.isDebugEnabled()) { - _logger.debug("Adding content chunk offset " + offset + " for message " +messageId); + _logger.debug("Adding content for message " +messageId); } PreparedStatement stmt = null; @@ -2336,20 +1838,15 @@ public class DerbyMessageStore implements MessageStore stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT); stmt.setLong(1,messageId); - stmt.setInt(2, offset); - stmt.setInt(3, offset+chunkData.length); - - - // TODO in Java 6 we could just use blobs ByteArrayInputStream bis = new ByteArrayInputStream(chunkData); - stmt.setBinaryStream(4, bis, chunkData.length); + stmt.setBinaryStream(2, bis, chunkData.length); stmt.executeUpdate(); } catch (SQLException e) { closeConnection(conn); - throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e); + throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e); } finally { @@ -2370,33 +1867,32 @@ public class DerbyMessageStore implements MessageStore stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); stmt.setLong(1,messageId); - stmt.setInt(2, offset); - stmt.setInt(3, offset+dst.remaining()); ResultSet rs = stmt.executeQuery(); int written = 0; - while(rs.next()) + if (rs.next()) { - int offsetInMessage = rs.getInt(1); - Blob dataAsBlob = rs.getBlob(2); + + Blob dataAsBlob = rs.getBlob(1); final int size = (int) dataAsBlob.length(); byte[] dataAsBytes = dataAsBlob.getBytes(1, size); - int posInArray = offset + written - offsetInMessage; - int count = size - posInArray; - if(count > dst.remaining()) + if (offset > size) { - count = dst.remaining(); + throw new RuntimeException("Offset " + offset + " is greater than message size " + size + + " for message id " + messageId + "!"); + } - dst.put(dataAsBytes,posInArray,count); - written+=count; - if(dst.remaining() == 0) + written = size - offset; + if(written > dst.remaining()) { - break; + written = dst.remaining(); } + + dst.put(dataAsBytes, offset, written); } return written; @@ -2635,7 +2131,7 @@ public class DerbyMessageStore implements MessageStore try { storeMetaData(conn, _messageId, _metaData); - DerbyMessageStore.this.addContent(conn, _messageId, 0, + DerbyMessageStore.this.addContent(conn, _messageId, _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); } finally @@ -2699,4 +2195,255 @@ public class DerbyMessageStore implements MessageStore { return _storeLocation; } + + private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + // If we don't have any data in the result set then we can add this configured object + if (!rs.next()) + { + PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS); + try + { + insertStmt.setString(1, configuredObject.getId().toString()); + insertStmt.setString(2, configuredObject.getType()); + if(configuredObject.getAttributes() == null) + { + insertStmt.setNull(3, Types.BLOB); + } + else + { + byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + insertStmt.setBinaryStream(3, bis, attributesAsBytes.length); + } + insertStmt.execute(); + } + finally + { + insertStmt.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e); + } + } + } + + private int removeConfiguredObject(UUID id) throws AMQStoreException + { + int results = 0; + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS); + try + { + stmt.setString(1, id.toString()); + results = stmt.executeUpdate(); + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e); + } + return results; + } + + private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException + { + if (_stateManager.isInState(State.ACTIVE)) + { + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, configuredObject.getId().toString()); + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS); + try + { + stmt2.setString(1, configuredObject.getType()); + if (configuredObject.getAttributes() != null) + { + byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET); + ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes); + stmt2.setBinaryStream(2, bis, attributesAsBytes.length); + } + else + { + stmt2.setNull(2, Types.BLOB); + } + stmt2.setString(3, configuredObject.getId().toString()); + stmt2.execute(); + } + finally + { + stmt2.close(); + } + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e); + } + } + } + + private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException + { + ConfiguredObjectRecord result = null; + try + { + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT); + try + { + stmt.setString(1, id.toString()); + ResultSet rs = stmt.executeQuery(); + try + { + if (rs.next()) + { + String type = rs.getString(1); + Blob blob = rs.getBlob(2); + String attributes = null; + if (blob != null) + { + attributes = blobToString(blob); + } + result = new ConfiguredObjectRecord(id, type, attributes); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + } + catch (SQLException e) + { + throw new AMQStoreException("Error loading of configured object with id " + id + " from database: " + + e.getMessage(), e); + } + return result; + } + + private String blobToString(Blob blob) throws SQLException + { + byte[] bytes = blob.getBytes(1, (int)blob.length()); + return new String(bytes, UTF8_CHARSET); + } + + private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException + { + ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>(); + Connection conn = newAutoCommitConnection(); + try + { + PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS); + try + { + ResultSet rs = stmt.executeQuery(); + try + { + while (rs.next()) + { + String id = rs.getString(1); + String objectType = rs.getString(2); + String attributes = blobToString(rs.getBlob(3)); + results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes)); + } + } + finally + { + rs.close(); + } + } + finally + { + stmt.close(); + } + } + finally + { + conn.close(); + } + return results; + } }
\ No newline at end of file diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 54b91a0ce2..79a8bc0e4c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.flow.WindowCreditManager; import org.apache.qpid.server.logging.messages.ExchangeMessages; import org.apache.qpid.server.message.MessageMetaData_0_10; import org.apache.qpid.server.message.MessageTransferMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.BaseQueue; @@ -683,13 +684,12 @@ public class ServerSessionDelegate extends SessionDelegate { String exchangeName = method.getExchange(); VirtualHost virtualHost = getVirtualHost(session); - Exchange exchange = getExchange(session, exchangeName); + ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); //we must check for any unsupported arguments present and throw not-implemented if(method.hasArguments()) { Map<String,Object> args = method.getArguments(); - //QPID-3392: currently we don't support any! if(!args.isEmpty()) { @@ -697,120 +697,113 @@ public class ServerSessionDelegate extends SessionDelegate return; } } - - if(method.getPassive()) + synchronized(exchangeRegistry) { - if(exchange == null) - { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'"); - - } - else - { - if(!exchange.getTypeShortString().toString().equals(method.getType()) && (method.getType() != null && method.getType().length() > 0)) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +"."); - } - } + Exchange exchange = getExchange(session, exchangeName); - } - else - { - if (exchange == null) + if(method.getPassive()) { - if(exchangeName.startsWith("amq.")) + if(exchange == null) { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'amq.'."); + exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'"); } - else if(exchangeName.startsWith("qpid.")) + else { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to declare exchange: " + exchangeName + - " which begins with reserved prefix 'qpid.'."); + if (!exchange.getTypeShortString().toString().equals(method.getType()) + && (method.getType() != null && method.getType().length() > 0)) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + "."); + } } - else + } + else + { + if (exchange == null) { - ExchangeRegistry exchangeRegistry = getExchangeRegistry(session); - ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); - - - - try + if (exchangeName.startsWith("amq.")) { - - exchange = exchangeFactory.createExchange(method.getExchange(), - method.getType(), - method.getDurable(), - method.getAutoDelete()); - - String alternateExchangeName = method.getAlternateExchange(); - boolean validAlternate; - if(alternateExchangeName != null && alternateExchangeName.length() != 0) + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved prefix 'amq.'."); + } + else if (exchangeName.startsWith("qpid.")) + { + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: " + + exchangeName + " which begins with reserved prefix 'qpid.'."); + } + else + { + ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory(); + try { - Exchange alternate = getExchange(session, alternateExchangeName); - if(alternate == null) + exchange = exchangeFactory.createExchange(method.getExchange(), + method.getType(), + method.getDurable(), + method.getAutoDelete()); + String alternateExchangeName = method.getAlternateExchange(); + boolean validAlternate; + if(alternateExchangeName != null && alternateExchangeName.length() != 0) { - validAlternate = false; + Exchange alternate = getExchange(session, alternateExchangeName); + if(alternate == null) + { + validAlternate = false; + } + else + { + exchange.setAlternateExchange(alternate); + validAlternate = true; + } } else { - exchange.setAlternateExchange(alternate); validAlternate = true; } - } - else - { - validAlternate = true; - } - - if(validAlternate) - { - if (exchange.isDurable()) + if(validAlternate) { - DurableConfigurationStore store = virtualHost.getMessageStore(); - store.createExchange(exchange); + if (exchange.isDurable()) + { + DurableConfigurationStore store = virtualHost.getMessageStore(); + store.createExchange(exchange); + } + exchangeRegistry.registerExchange(exchange); } - - exchangeRegistry.registerExchange(exchange); + else + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, + "Unknown alternate exchange " + alternateExchangeName); + } + } + catch(AMQUnknownExchangeType e) + { + exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); } - else + catch (AMQException e) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, - "Unknown alternate exchange " + alternateExchangeName); + exception(session, method, e, "Cannot declare exchange '" + exchangeName); } } - catch(AMQUnknownExchangeType e) + } + else + { + if(!exchange.getTypeShortString().toString().equals(method.getType())) { - exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType()); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to redeclare exchange: " + exchangeName + + " of type " + exchange.getTypeShortString() + + " to " + method.getType() +"."); } - catch (AMQException e) + else if(method.hasAlternateExchange() + && (exchange.getAlternateExchange() == null || + !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) { - exception(session, method, e, "Cannot declare exchange '" + exchangeName); + exception(session, method, ExecutionErrorCode.NOT_ALLOWED, + "Attempt to change alternate exchange of: " + exchangeName + + " from " + exchange.getAlternateExchange() + + " to " + method.getAlternateExchange() +"."); } } } - else - { - if(!exchange.getTypeShortString().toString().equals(method.getType())) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to redeclare exchange: " + exchangeName - + " of type " + exchange.getTypeShortString() - + " to " + method.getType() +"."); - } - else if(method.hasAlternateExchange() - && (exchange.getAlternateExchange() == null || - !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName()))) - { - exception(session, method, ExecutionErrorCode.NOT_ALLOWED, - "Attempt to change alternate exchange of: " + exchangeName - + " from " + exchange.getAlternateExchange() - + " to " + method.getAlternateExchange() +"."); - } - } - } } @@ -1396,8 +1389,8 @@ public class ServerSessionDelegate extends SessionDelegate { String owner = body.getExclusive() ? session.getClientID() : null; - final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(), - body.getExclusive(), virtualHost, body.getArguments()); + final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner, + body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments()); return queue; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java new file mode 100644 index 0000000000..2d9ba38555 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java @@ -0,0 +1,69 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.io.StringWriter; +import java.util.Map; + +import org.codehaus.jackson.map.ObjectMapper; +import org.codehaus.jackson.type.TypeReference; + +public class MapJsonSerializer +{ + private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>() + { + }; + + private ObjectMapper _mapper; + + public MapJsonSerializer() + { + _mapper = new ObjectMapper(); + } + + public String serialize(Map<String, Object> attributeMap) + { + StringWriter stringWriter = new StringWriter(); + try + { + _mapper.writeValue(stringWriter, attributeMap); + } + catch (Exception e) + { + throw new RuntimeException("Failure to serialize map:" + attributeMap, e); + } + return stringWriter.toString(); + } + + public Map<String, Object> deserialize(String json) + { + Map<String, Object> attributesMap = null; + try + { + attributesMap = _mapper.readValue(json, MAP_TYPE_REFERENCE); + } + catch (Exception e) + { + throw new RuntimeException("Failure to deserialize json:" + json, e); + } + return attributesMap; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 9333456c2e..e956806823 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -100,7 +100,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments) + public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments) { try { @@ -108,7 +108,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (q == null) { - q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost, + q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost, FieldTable.convertToMap(arguments)); _virtualHost.getQueueRegistry().registerQueue(q); } @@ -130,7 +130,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa return this; } - public void exchange(String exchangeName, String type, boolean autoDelete) + public void exchange(UUID id, String exchangeName, String type, boolean autoDelete) { try { @@ -139,7 +139,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS); if (exchange == null) { - exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); + exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0); _virtualHost.getExchangeRegistry().registerExchange(exchange); } } @@ -212,7 +212,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } for(Transaction.Record record : enqueues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -265,13 +265,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getResourceName())); + record.getQueue().getId().toString())); } } for(Transaction.Record record : dequeues) { - final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName()); + final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId()); if(queue != null) { final long messageId = record.getMessage().getMessageNumber(); @@ -315,7 +315,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa StringBuilder xidString = xidAsString(id); CurrentActor.get().message(_logSubject, TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(), - record.getQueue().getResourceName())); + record.getQueue().getId().toString())); } } @@ -354,21 +354,22 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false)); } - public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf) + @Override + public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf) { try { - Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName); + Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId); if (exchange == null) { - _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName); + _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId); return; } - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName)); + + AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); if (queue == null) { - _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName); + _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName()); } else { @@ -392,10 +393,10 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null) { - _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName + _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName() + ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")"); - bf.restoreBinding(bindingKey, queue, exchange, argumentMap); + bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap); } } } @@ -417,16 +418,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } - public void queueEntry(final String queueName, long messageId) + public void queueEntry(final UUID queueId, long messageId) { - AMQShortString queueNameShortString = new AMQShortString(queueName); - - AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString); - + AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId); try { if(queue != null) { + String queueName = queue.getName(); ServerMessage message = _recoveredMessages.get(messageId); _unusedMessages.remove(messageId); @@ -436,7 +435,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa if (_logger.isDebugEnabled()) { - _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString()); + _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName); } Integer count = _queueRecoveries.get(queueName); @@ -451,7 +450,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } else { - _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded"); + _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded"); Transaction txn = _store.newTransaction(); txn.dequeueMessage(queue, new DummyMessage(messageId)); txn.commitTranAsync(); @@ -459,15 +458,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa } else { - _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded"); + _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded"); Transaction txn = _store.newTransaction(); TransactionLogResource mockQueue = new TransactionLogResource() { - - public String getResourceName() + @Override + public UUID getId() { - return queueName; + return queueId; } }; txn.dequeueMessage(mockQueue, new DummyMessage(messageId)); @@ -479,9 +478,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa { throw new RuntimeException(e); } - - - } public DtxRecordRecoveryHandler completeQueueEntryRecovery() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index b6ee95a1cb..afd8fd9ed2 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -29,6 +29,7 @@ import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.UUID; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; @@ -45,6 +46,7 @@ import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.BaseQueue; @@ -274,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase public TestQueue(AMQShortString name) throws AMQException { - super(name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP); + super(UUIDGenerator.generateUUID(), name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP); ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java index 1fac4afe29..9034bf9c3a 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java @@ -26,6 +26,7 @@ import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.management.common.mbeans.ManagedExchange; import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; import org.apache.qpid.server.queue.QueueRegistry; @@ -52,7 +53,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testGeneralProperties() throws Exception { DirectExchange exchange = new DirectExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -67,7 +68,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testDirectExchangeMBean() throws Exception { DirectExchange exchange = new DirectExchange(); - exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -82,7 +83,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testTopicExchangeMBean() throws Exception { TopicExchange exchange = new TopicExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -97,7 +98,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testHeadersExchangeMBean() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -119,7 +120,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testHeadersExchangeMBeanMatchPropertyNoValue() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; @@ -137,7 +138,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase public void testInvalidHeaderBindingMalformed() throws Exception { HeadersExchange exchange = new HeadersExchange(); - exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); + exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true); ManagedObject managedObj = exchange.getManagedObject(); ManagedExchange mbean = (ManagedExchange)managedObj; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java index 2e3ff90df9..e123a968a4 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.queue; +import java.util.UUID; + import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java index 8041d59ffa..52ad4a7c5b 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java @@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction; import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter; import org.apache.qpid.server.registry.ApplicationRegistry; @@ -137,7 +138,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase } try { - _queue = new SimpleAMQQueue(_qname, false, _owner, false, false,null, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP); assertNull("Queue was created", _queue); } catch (IllegalArgumentException e) @@ -479,7 +480,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testAutoDeleteQueue() throws Exception { _queue.stop(); - _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); + _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP); _queue.setDeleteOnNoConsumers(true); _queue.registerSubscription(_subscription, false); AMQMessage message = createMessage(new Long(25)); @@ -691,8 +692,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testProcessQueueWithUniqueSelectors() throws Exception { TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory(); - SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false, - false, _virtualHost, factory, null) + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), "testQueue", false,"testOwner", + false, false, _virtualHost, factory, null) { @Override public void deliverAsync(Subscription sub) @@ -1028,8 +1029,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase int dequeueMessageIndex = 1; // create queue with overridden method deliverAsync - SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false, - new AMQShortString("testOwner"), false, false, _virtualHost, null) + SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), + false, new AMQShortString("testOwner"), false, false, _virtualHost, null) { @Override public void deliverAsync(Subscription sub) @@ -1099,8 +1100,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testEqueueDequeuedEntry() { // create a queue where each even entry is considered a dequeued - SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"), - false, false, _virtualHost, new QueueEntryListFactory() + SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory() { public QueueEntryList createQueueEntryList(AMQQueue queue) { @@ -1177,8 +1178,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase public void testActiveConsumerCount() throws Exception { - final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"), - false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); + final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("testActiveConsumerCount"), false, + new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null); //verify adding an active subscription increases the count final MockSubscription subscription1 = new MockSubscription(); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java new file mode 100644 index 0000000000..a1cbb2cbc8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java @@ -0,0 +1,377 @@ +package org.apache.qpid.server.store; + +import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyString; +import static org.mockito.Matchers.isA; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.times; + +import java.io.File; +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; + +import org.apache.commons.configuration.Configuration; +import org.apache.qpid.AMQStoreException; +import org.apache.qpid.common.AMQPFilterTypes; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.binding.Binding; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.logging.SystemOutMessageLogger; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.TestLogActor; +import org.apache.qpid.server.logging.subjects.TestBlankSubject; +import org.apache.qpid.server.message.EnqueableMessage; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.MockStoredMessage; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler; +import org.apache.qpid.server.store.TransactionLogRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler; +import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler; +import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler; +import org.apache.qpid.server.store.Transaction.Record; +import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory; +import org.apache.qpid.test.utils.QpidTestCase; +import org.apache.qpid.util.FileUtils; + +public class DurableConfigurationStoreTest extends QpidTestCase +{ + private static final String EXCHANGE_NAME = "exchangeName"; + private String _storePath; + private String _storeName; + private MessageStore _store; + private Configuration _configuration; + + private ConfigurationRecoveryHandler _recoveryHandler; + private QueueRecoveryHandler _queueRecoveryHandler; + private ExchangeRecoveryHandler _exchangeRecoveryHandler; + private BindingRecoveryHandler _bindingRecoveryHandler; + private ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler _linkRecoveryHandler; + private MessageStoreRecoveryHandler _messageStoreRecoveryHandler; + private StoredMessageRecoveryHandler _storedMessageRecoveryHandler; + private TransactionLogRecoveryHandler _logRecoveryHandler; + private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler; + private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler; + + private Exchange _exchange = mock(Exchange.class); + private static final String ROUTING_KEY = "routingKey"; + private static final String QUEUE_NAME = "queueName"; + private FieldTable _bindingArgs; + private UUID _queueId; + private UUID _exchangeId; + + public void setUp() throws Exception + { + super.setUp(); + + _queueId = UUIDGenerator.generateUUID(); + _exchangeId = UUIDGenerator.generateUUID(); + + _storeName = getName(); + _storePath = TMP_FOLDER + "/" + _storeName; + FileUtils.delete(new File(_storePath), true); + setTestSystemProperty("QPID_WORK", TMP_FOLDER); + _configuration = mock(Configuration.class); + _recoveryHandler = mock(ConfigurationRecoveryHandler.class); + _queueRecoveryHandler = mock(QueueRecoveryHandler.class); + _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class); + _bindingRecoveryHandler = mock(BindingRecoveryHandler.class); + _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class); + _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class); + _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class); + _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class); + _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class); + + when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler); + when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler); + when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler); + when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler); + when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler); + when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler); + when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler); + when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME)); + when(_exchange.getId()).thenReturn(_exchangeId); + when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn( + _storePath); + + _bindingArgs = new FieldTable(); + AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue(); + String argValue = "some selector expression"; + _bindingArgs.put(argKey, argValue); + + reopenStore(); + } + + public void tearDown() throws Exception + { + FileUtils.delete(new File(_storePath), true); + super.tearDown(); + } + + public void testCreateExchange() throws Exception + { + Exchange exchange = createTestExchange(); + _store.createExchange(exchange); + + reopenStore(); + verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true); + } + + public void testRemoveExchange() throws Exception + { + Exchange exchange = createTestExchange(); + _store.createExchange(exchange); + + _store.removeExchange(exchange); + + reopenStore(); + verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean()); + } + + public void testBindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange, + FieldTable.convertToMap(_bindingArgs)); + _store.bindQueue(binding); + + reopenStore(); + + ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes()); + + verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes); + } + + public void testUnbindQueue() throws Exception + { + AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false); + Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange, + FieldTable.convertToMap(_bindingArgs)); + _store.bindQueue(binding); + + _store.unbindQueue(binding); + reopenStore(); + + verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(), + isA(ByteBuffer.class)); + } + + public void testCreateQueueAMQQueue() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + _store.createQueue(queue); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null); + } + + public void testCreateQueueAMQQueueFieldTable() throws Exception + { + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments); + } + + public void testUpdateQueue() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + // update the queue to have exclusive=false + queue = createTestQueue(getName(), getName() + "Owner", false); + _store.updateQueue(queue); + + reopenStore(); + verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments); + } + + public void testRemoveQueue() throws Exception + { + // create queue + AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true); + Map<String, Object> attributes = new HashMap<String, Object>(); + attributes.put("x-qpid-dlq-enabled", Boolean.TRUE); + attributes.put("x-qpid-maximum-delivery-count", new Integer(10)); + FieldTable arguments = FieldTable.convertToFieldTable(attributes); + _store.createQueue(queue, arguments); + + // remove queue + _store.removeQueue(queue); + reopenStore(); + verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(), + any(FieldTable.class)); + } + + private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException + { + AMQQueue queue = mock(AMQQueue.class); + when(queue.getName()).thenReturn(queueName); + when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName)); + when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner)); + when(queue.isExclusive()).thenReturn(exclusive); + when(queue.getId()).thenReturn(_queueId); + return queue; + } + + private Exchange createTestExchange() + { + Exchange exchange = mock(Exchange.class); + when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName())); + when(exchange.getName()).thenReturn(getName()); + when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type")); + when(exchange.isAutoDelete()).thenReturn(true); + when(exchange.getId()).thenReturn(_exchangeId); + return exchange; + } + + private void reopenStore() throws Exception + { + if (_store != null) + { + _store.close(); + } + _store = createStore(); + + _store.configureConfigStore(_storeName, _recoveryHandler, _configuration); + _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration); + _store.activate(); + } + + protected MessageStore createStore() throws Exception + { + String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY); + if (storeFactoryClass == null) + { + storeFactoryClass = DerbyMessageStoreFactory.class.getName(); + } + CurrentActor.set(new TestLogActor(new SystemOutMessageLogger())); + MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance(); + return factory.createMessageStore(); + } + + public void testRecordXid() throws Exception + { + Record enqueueRecord = getTestRecord(1); + Record dequeueRecord = getTestRecord(2); + Record[] enqueues = { enqueueRecord }; + Record[] dequeues = { dequeueRecord }; + byte[] globalId = new byte[] { 1 }; + byte[] branchId = new byte[] { 2 }; + + Transaction transaction = _store.newTransaction(); + transaction.recordXid(1l, globalId, branchId, enqueues, dequeues); + transaction.commitTran(); + reopenStore(); + verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + + transaction = _store.newTransaction(); + transaction.removeXid(1l, globalId, branchId); + transaction.commitTran(); + + reopenStore(); + verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues); + } + + private Record getTestRecord(long messageNumber) + { + UUID queueId1 = UUIDGenerator.generateUUID(); + TransactionLogResource queue1 = mock(TransactionLogResource.class); + when(queue1.getId()).thenReturn(queueId1); + EnqueableMessage message1 = mock(EnqueableMessage.class); + when(message1.isPersistent()).thenReturn(true); + when(message1.getMessageNumber()).thenReturn(messageNumber); + when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber)); + Record enqueueRecord = new TestRecord(queue1, message1); + return enqueueRecord; + } + + private static class TestRecord implements Record + { + private TransactionLogResource _queue; + private EnqueableMessage _message; + + public TestRecord(TransactionLogResource queue, EnqueableMessage message) + { + super(); + _queue = queue; + _message = message; + } + + @Override + public TransactionLogResource getQueue() + { + return _queue; + } + + @Override + public EnqueableMessage getMessage() + { + return _message; + } + + @Override + public int hashCode() + { + final int prime = 31; + int result = 1; + result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode()); + result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode()); + return result; + } + + @Override + public boolean equals(Object obj) + { + if (this == obj) + { + return true; + } + if (obj == null) + { + return false; + } + if (!(obj instanceof Record)) + { + return false; + } + Record other = (Record) obj; + if (_message == null && other.getMessage() != null) + { + return false; + } + if (_queue == null && other.getQueue() != null) + { + return false; + } + if (_message.getMessageNumber() != other.getMessage().getMessageNumber()) + { + return false; + } + return _queue.getId().equals(other.getQueue().getId()); + } + + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java index c589bd108b..3fb0776083 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java @@ -39,6 +39,7 @@ import org.apache.qpid.server.exchange.ExchangeType; import org.apache.qpid.server.exchange.TopicExchange; import org.apache.qpid.server.message.AMQMessage; import org.apache.qpid.server.message.MessageMetaData; +import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQPriorityQueue; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.AMQQueueFactory; @@ -58,6 +59,7 @@ import java.io.File; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.UUID; /** * This tests the MessageStores by using the available interfaces. @@ -739,7 +741,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase try { - exchange = type.newInstance(getVirtualHost(), name, durable, 0, false); + exchange = type.newInstance(UUIDGenerator.generateUUID(), getVirtualHost(), name, durable, 0, false); } catch (AMQException e) { diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java index 5a11a7aa32..8a34e92985 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java @@ -20,6 +20,8 @@ */ package org.apache.qpid.server.util; +import java.util.UUID; + import org.apache.commons.configuration.XMLConfiguration; import org.apache.qpid.AMQException; diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java new file mode 100644 index 0000000000..56567523df --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java @@ -0,0 +1,53 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.util; + +import java.util.HashMap; +import java.util.Map; + +import junit.framework.TestCase; + +public class MapJsonSerializerTest extends TestCase +{ + private MapJsonSerializer _serializer; + + protected void setUp() throws Exception + { + super.setUp(); + _serializer = new MapJsonSerializer(); + + } + + public void testSerializeDeserialize() + { + Map<String, Object> testMap = new HashMap<String, Object>(); + testMap.put("string", "Test String"); + testMap.put("integer", new Integer(10)); + testMap.put("long", new Long(Long.MAX_VALUE)); + testMap.put("boolean", Boolean.TRUE); + + String jsonString = _serializer.serialize(testMap); + Map<String, Object> deserializedMap = _serializer.deserialize(jsonString); + + assertEquals(deserializedMap, testMap); + } + +} |
