summaryrefslogtreecommitdiff
path: root/qpid/java/broker
diff options
context:
space:
mode:
authorRobert Gemmell <robbie@apache.org>2012-04-17 09:01:44 +0000
committerRobert Gemmell <robbie@apache.org>2012-04-17 09:01:44 +0000
commit3203eea7641e1b0f39de96d797db7c54423b7f02 (patch)
treef2563ba4a85ac54765d8f62663b60853846b3a89 /qpid/java/broker
parentdeab61acfe5f4edaae121cf6b9fa5d4b9e42803f (diff)
downloadqpid-python-3203eea7641e1b0f39de96d797db7c54423b7f02.tar.gz
QPID-3923: Store queue, exchange and binding as configured objects in bdb store
Applied patch by Oleksandr Rudyy <orudyy@gmail.com>, Phil Harvey <phil@philharveyonline.com>, and myself. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1327003 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java17
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java65
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java39
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java7
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java10
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java75
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java38
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java229
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java73
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java91
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java43
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java27
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java146
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java30
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java25
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java54
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java13
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java15
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java11
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java6
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java183
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java65
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java20
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java6
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java6
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java4
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java987
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java173
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java69
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java56
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java13
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java21
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java377
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java4
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java2
-rw-r--r--qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java53
61 files changed, 2387 insertions, 914 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
index 2d6f7e0946..034a4ae53c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/qmf/ManagementExchange.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.topic.TopicNormalizer;
import org.apache.qpid.server.exchange.topic.TopicParser;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.virtualhost.HouseKeepingTask;
@@ -83,7 +84,8 @@ public class ManagementExchange implements Exchange, QMFService.Listener
private class ManagementQueue implements BaseQueue
{
- private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + UUID.randomUUID().toString();
+ private final UUID QUEUE_ID = UUIDGenerator.generateUUID();
+ private final String NAME_AS_STRING = "##__mgmt_pseudo_queue__##" + QUEUE_ID.toString();
private final AMQShortString NAME_AS_SHORT_STRING = new AMQShortString(NAME_AS_STRING);
public void enqueue(ServerMessage message) throws AMQException
@@ -129,9 +131,10 @@ public class ManagementExchange implements Exchange, QMFService.Listener
return NAME_AS_SHORT_STRING;
}
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return NAME_AS_STRING;
+ return QUEUE_ID;
}
}
@@ -155,14 +158,14 @@ public class ManagementExchange implements Exchange, QMFService.Listener
return ManagementExchange.class;
}
- public ManagementExchange newInstance(VirtualHost host,
+ public ManagementExchange newInstance(UUID id, VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
ManagementExchange exch = new ManagementExchange();
- exch.initialise(host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, ticket, autoDelete);
return exch;
}
@@ -183,7 +186,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener
return QPID_MANAGEMENT_TYPE;
}
- public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
throws AMQException
{
if(!QPID_MANAGEMENT.equals(name))
@@ -191,7 +194,7 @@ public class ManagementExchange implements Exchange, QMFService.Listener
throw new AMQException("Can't create more than one Management exchange");
}
_virtualHost = host;
- _id = host.getConfigStore().createId();
+ _id = id;
_virtualHost.scheduleHouseKeepingTask(_virtualHost.getBroker().getManagementPublishInterval(), new UpdateTask(_virtualHost));
getConfigStore().addConfiguredObject(this);
getQMFService().addListener(this);
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
index 7ef06ce0f8..0f32b98aa8 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQBrokerManagerMBean.java
@@ -33,6 +33,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.ManagementActor;
import org.apache.qpid.server.management.AMQManagedObject;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.AMQQueueMBean;
@@ -48,6 +49,7 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* This MBean implements the broker management interface and exposes the
@@ -171,8 +173,8 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
Exchange exchange = _exchangeRegistry.getExchange(new AMQShortString(exchangeName));
if (exchange == null)
{
- exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName), new AMQShortString(type),
- durable, false, 0);
+ exchange = _exchangeFactory.createExchange(new AMQShortString(exchangeName),
+ new AMQShortString(type), durable, false, 0);
_exchangeRegistry.registerExchange(exchange);
if (durable)
{
@@ -244,45 +246,42 @@ public class AMQBrokerManagerMBean extends AMQManagedObject implements ManagedBr
public void createNewQueue(String queueName, String owner, boolean durable, Map<String,Object> arguments) throws JMException
{
final AMQShortString queueNameAsAMQShortString = new AMQShortString(queueName);
- AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
- if (queue != null)
+ synchronized (_queueRegistry)
{
- throw new JMException("The queue \"" + queueName + "\" already exists.");
- }
-
- CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger()));
- try
- {
- AMQShortString ownerShortString = null;
- if (owner != null)
+ AMQQueue queue = _queueRegistry.getQueue(queueNameAsAMQShortString);
+ if (queue != null)
{
- ownerShortString = new AMQShortString(owner);
+ throw new JMException("The queue \"" + queueName + "\" already exists.");
}
- FieldTable args = null;
- if(arguments != null)
+ CurrentActor.set(new ManagementActor(getLogActor().getRootMessageLogger()));
+ try
{
- args = FieldTable.convertToFieldTable(arguments);
- }
- final VirtualHost virtualHost = getVirtualHost();
+ FieldTable args = null;
+ if(arguments != null)
+ {
+ args = FieldTable.convertToFieldTable(arguments);
+ }
+ final VirtualHost virtualHost = getVirtualHost();
+
+ queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, durable, owner,
+ false, false, getVirtualHost(), arguments);
+ if (queue.isDurable() && !queue.isAutoDelete())
+ {
+ getVirtualHost().getMessageStore().createQueue(queue, args);
+ }
- queue = AMQQueueFactory.createAMQQueueImpl(queueNameAsAMQShortString, durable, ownerShortString,
- false, false, getVirtualHost(), args);
- if (queue.isDurable() && !queue.isAutoDelete())
+ virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null);
+ }
+ catch (AMQException ex)
{
- getVirtualHost().getMessageStore().createQueue(queue, args);
+ JMException jme = new JMException(ex.toString());
+ throw new MBeanException(jme, "Error in creating queue " + queueName);
+ }
+ finally
+ {
+ CurrentActor.remove();
}
-
- virtualHost.getBindingFactory().addBinding(queueName, queue, _exchangeRegistry.getDefaultExchange(), null);
- }
- catch (AMQException ex)
- {
- JMException jme = new JMException(ex.toString());
- throw new MBeanException(jme, "Error in creating queue " + queueName);
- }
- finally
- {
- CurrentActor.remove();
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
index 8e44da095a..2efd4cee26 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/Binding.java
@@ -117,7 +117,7 @@ public class Binding
public String toString()
{
- return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+"}";
+ return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }";
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
index 2460be4705..abf252c733 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/binding/BindingFactory.java
@@ -24,7 +24,6 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQInternalException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.server.configuration.BindingConfig;
import org.apache.qpid.server.configuration.BindingConfigType;
import org.apache.qpid.server.configuration.ConfigStore;
@@ -33,11 +32,13 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.messages.BindingMessages;
import org.apache.qpid.server.logging.subjects.BindingLogSubject;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class BindingFactory
@@ -57,9 +58,9 @@ public class BindingFactory
//TODO : persist creation time
private long _createTime = System.currentTimeMillis();
- private BindingImpl(String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
+ private BindingImpl(UUID id, String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> arguments)
{
- super(queue.getVirtualHost().getConfigStore().createId(), bindingKey, queue, exchange, arguments);
+ super(id, bindingKey, queue, exchange, arguments);
_logSubject = new BindingLogSubject(bindingKey,exchange,queue);
}
@@ -116,19 +117,19 @@ public class BindingFactory
public boolean addBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
- return makeBinding(bindingKey, queue, exchange, arguments, false, false);
+ return makeBinding(null, bindingKey, queue, exchange, arguments, false, false);
}
- public boolean replaceBinding(final String bindingKey,
+ public boolean replaceBinding(final UUID id, final String bindingKey,
final AMQQueue queue,
final Exchange exchange,
final Map<String, Object> arguments) throws AMQSecurityException, AMQInternalException
{
- return makeBinding(bindingKey, queue, exchange, arguments, false, true);
+ return makeBinding(id, bindingKey, queue, exchange, arguments, false, true);
}
- private boolean makeBinding(String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
+ private boolean makeBinding(UUID id, String bindingKey, AMQQueue queue, Exchange exchange, Map<String, Object> arguments, boolean restore, boolean force) throws AMQSecurityException, AMQInternalException
{
assert queue != null;
final Exchange defaultExchange = _virtualHost.getExchangeRegistry().getDefaultExchange();
@@ -163,9 +164,12 @@ public class BindingFactory
}
}
-
- BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
- BindingImpl existingMapping = _bindings.putIfAbsent(b,b);
+ if (id == null)
+ {
+ id = UUIDGenerator.generateUUID();
+ }
+ BindingImpl b = new BindingImpl(id, bindingKey, queue, exchange, arguments);
+ BindingImpl existingMapping = _bindings.putIfAbsent(b, b);
if (existingMapping == null || force)
{
if (existingMapping != null)
@@ -175,7 +179,7 @@ public class BindingFactory
if (b.isDurable() && !restore)
{
- _virtualHost.getMessageStore().bindQueue(exchange,new AMQShortString(bindingKey),queue,FieldTable.convertToFieldTable(arguments));
+ _virtualHost.getMessageStore().bindQueue(b);
}
queue.addQueueDeleteTask(b);
@@ -198,9 +202,9 @@ public class BindingFactory
return _virtualHost.getConfigStore();
}
- public void restoreBinding(final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
+ public void restoreBinding(final UUID id, final String bindingKey, final AMQQueue queue, final Exchange exchange, final Map<String, Object> argumentMap) throws AMQSecurityException, AMQInternalException
{
- makeBinding(bindingKey,queue,exchange,argumentMap,true, false);
+ makeBinding(id, bindingKey,queue,exchange,argumentMap,true, false);
}
public void removeBinding(final Binding b) throws AMQSecurityException, AMQInternalException
@@ -239,7 +243,7 @@ public class BindingFactory
}
}
- BindingImpl b = _bindings.remove(new BindingImpl(bindingKey,queue,exchange,arguments));
+ BindingImpl b = _bindings.remove(new BindingImpl(null, bindingKey,queue,exchange,arguments));
if (b != null)
{
@@ -250,10 +254,7 @@ public class BindingFactory
if (b.isDurable())
{
- _virtualHost.getMessageStore().unbindQueue(exchange,
- new AMQShortString(bindingKey),
- queue,
- FieldTable.convertToFieldTable(arguments));
+ _virtualHost.getMessageStore().unbindQueue(b);
}
b.logDestruction();
getConfigStore().removeConfiguredObject(b);
@@ -280,7 +281,7 @@ public class BindingFactory
arguments = Collections.emptyMap();
}
- BindingImpl b = new BindingImpl(bindingKey,queue,exchange,arguments);
+ BindingImpl b = new BindingImpl(null, bindingKey,queue,exchange,arguments);
return _bindings.get(b);
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index af49168a80..9493f400f2 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -114,7 +114,7 @@ public abstract class AbstractExchange implements Exchange, Managable
*/
protected abstract AbstractExchangeMBean createMBean() throws JMException;
- public void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ public void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
throws AMQException
{
_virtualHost = host;
@@ -123,7 +123,7 @@ public abstract class AbstractExchange implements Exchange, Managable
_autoDelete = autoDelete;
_ticket = ticket;
- _id = getConfigStore().createId();
+ _id = id;
getConfigStore().addConfiguredObject(this);
createAndRegisterMBean();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
index 153419de1b..5058f91995 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
@@ -25,9 +25,11 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.AMQUnknownExchangeType;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.qmf.ManagementExchange;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -35,6 +37,7 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
+import java.util.UUID;
public class DefaultExchangeFactory implements ExchangeFactory
{
@@ -76,17 +79,29 @@ public class DefaultExchangeFactory implements ExchangeFactory
return publicTypes;
}
-
-
public Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete)
- throws AMQException
+ throws AMQException
{
return createExchange(new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
}
- public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable, boolean autoDelete,
- int ticket)
+ public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete)
+ throws AMQException
+ {
+ return createExchange(id, new AMQShortString(exchange), new AMQShortString(type), durable, autoDelete, 0);
+ }
+
+ public Exchange createExchange(AMQShortString exchange, AMQShortString type, boolean durable,
+ boolean autoDelete, int ticket)
+ throws AMQException
+ {
+ UUID id = UUIDGenerator.generateExchangeUUID(exchange.asString(), _host.getName());
+ return createExchange(id, exchange, type, durable, autoDelete, ticket);
+ }
+
+ public Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
+ boolean autoDelete, int ticket)
throws AMQException
{
// Check access
@@ -102,7 +117,7 @@ public class DefaultExchangeFactory implements ExchangeFactory
throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
}
- Exchange e = exchType.newInstance(_host, exchange, durable, ticket, autoDelete);
+ Exchange e = exchType.newInstance(id, _host, exchange, durable, ticket, autoDelete);
return e;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
index 33e73b4668..bf4184bf0b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
@@ -30,6 +30,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -174,4 +175,25 @@ public class DefaultExchangeRegistry implements ExchangeRegistry
_exchangeMapStr.clear();
}
+ @Override
+ public synchronized Exchange getExchange(UUID exchangeId)
+ {
+ if (exchangeId == null)
+ {
+ return getDefaultExchange();
+ }
+ else
+ {
+ Collection<Exchange> exchanges = _exchangeMap.values();
+ for (Exchange exchange : exchanges)
+ {
+ if (exchange.getId().equals(exchangeId))
+ {
+ return exchange;
+ }
+ }
+ return null;
+ }
+ }
+
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 9525324f57..af9322764a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -36,6 +36,7 @@ import javax.management.JMException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -103,14 +104,14 @@ public class DirectExchange extends AbstractExchange
return DirectExchange.class;
}
- public DirectExchange newInstance(VirtualHost host,
+ public DirectExchange newInstance(UUID id, VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
DirectExchange exch = new DirectExchange();
- exch.initialise(host,name,durable,ticket,autoDelete);
+ exch.initialise(id, host,name,durable,ticket,autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
index 0bcfc3a3da..289cb1a923 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/Exchange.java
@@ -36,6 +36,7 @@ import javax.management.JMException;
import java.util.Collection;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
public interface Exchange extends ExchangeReferrer, ExchangeConfig
{
@@ -50,7 +51,7 @@ public interface Exchange extends ExchangeReferrer, ExchangeConfig
AMQShortString getTypeShortString();
- void initialise(VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
+ void initialise(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket, boolean autoDelete)
throws AMQException, JMException;
boolean isDurable();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
index 577da79028..aae4ae89bb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
@@ -25,6 +25,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import java.util.Collection;
+import java.util.UUID;
public interface ExchangeFactory
@@ -40,4 +41,10 @@ public interface ExchangeFactory
Collection<ExchangeType<? extends Exchange>> getPublicCreatableTypes();
Exchange createExchange(String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
+
+ Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQException;
+
+ Exchange createExchange(UUID id, AMQShortString exchange, AMQShortString type, boolean durable,
+ boolean autoDelete, int ticket)
+ throws AMQException;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
index 335efaeaa2..ba4f57a8e0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeInitialiser.java
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.server.exchange;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
index db244c114b..795ae2e140 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
@@ -24,6 +24,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import java.util.Collection;
+import java.util.UUID;
public interface ExchangeRegistry
@@ -54,4 +55,6 @@ public interface ExchangeRegistry
void unregisterExchange(String exchange, boolean ifUnused) throws ExchangeInUseException, AMQException;
void clearAndUnregisterMbeans();
+
+ Exchange getExchange(UUID exchangeId);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java
index ce339c4e29..a01e41f039 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/ExchangeType.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.exchange;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -29,7 +31,7 @@ public interface ExchangeType<T extends Exchange>
{
public AMQShortString getName();
public Class<T> getExchangeClass();
- public T newInstance(VirtualHost host, AMQShortString name,
+ public T newInstance(UUID id, VirtualHost host, AMQShortString name,
boolean durable, int ticket, boolean autoDelete) throws AMQException;
public AMQShortString getDefaultExchangeName();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index f9ad2fad87..5ebcfd095f 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -34,6 +34,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost;
import javax.management.JMException;
import java.util.ArrayList;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
public class FanoutExchange extends AbstractExchange
@@ -65,14 +66,14 @@ public class FanoutExchange extends AbstractExchange
return FanoutExchange.class;
}
- public FanoutExchange newInstance(VirtualHost host,
+ public FanoutExchange newInstance(UUID id, VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
FanoutExchange exch = new FanoutExchange();
- exch.initialise(host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, ticket, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 2700a7cda3..16ba3c0431 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -37,6 +37,7 @@ import javax.management.JMException;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -93,12 +94,12 @@ public class HeadersExchange extends AbstractExchange
return HeadersExchange.class;
}
- public HeadersExchange newInstance(VirtualHost host, AMQShortString name, boolean durable, int ticket,
+ public HeadersExchange newInstance(UUID id, VirtualHost host, AMQShortString name, boolean durable, int ticket,
boolean autoDelete) throws AMQException
{
HeadersExchange exch = new HeadersExchange();
- exch.initialise(host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, ticket, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 7ce84b7a89..7ea7a41826 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -27,6 +27,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
+import java.util.UUID;
import java.util.WeakHashMap;
import java.util.concurrent.ConcurrentHashMap;
import javax.management.JMException;
@@ -70,14 +71,14 @@ public class TopicExchange extends AbstractExchange
return TopicExchange.class;
}
- public TopicExchange newInstance(VirtualHost host,
+ public TopicExchange newInstance(UUID id, VirtualHost host,
AMQShortString name,
boolean durable,
int ticket,
boolean autoDelete) throws AMQException
{
TopicExchange exch = new TopicExchange();
- exch.initialise(host, name, durable, ticket, autoDelete);
+ exch.initialise(id, host, name, durable, ticket, autoDelete);
return exch;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
index 4b4bdd4efb..c7046f8e53 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/federation/Bridge.java
@@ -767,13 +767,13 @@ public class Bridge implements BridgeConfig
try
{
- _queue = AMQQueueFactory.createAMQQueueImpl(_tmpQueueName,
+ _queue = AMQQueueFactory.createAMQQueueImpl(null,
+ _tmpQueueName,
isDurable(),
_link.getFederationTag(),
false,
false,
- getVirtualHost(),
- options);
+ getVirtualHost(), options);
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
index bb979d5441..49ca934966 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueBindHandler.java
@@ -134,7 +134,7 @@ public class QueueBindHandler implements StateAwareMethodListener<QueueBindBody>
Map<String, Object> oldArgs = oldBinding.getArguments();
if((oldArgs == null && !arguments.isEmpty()) || (oldArgs != null && !oldArgs.equals(arguments)))
{
- virtualHost.getBindingFactory().replaceBinding(bindingKey, queue, exch, arguments);
+ virtualHost.getBindingFactory().replaceBinding(oldBinding.getId(), bindingKey, queue, exch, arguments);
}
}
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
index 7d993ae14d..396829df91 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java
@@ -24,6 +24,7 @@ import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.QueueDeclareBody;
import org.apache.qpid.framing.QueueDeclareOkBody;
@@ -31,6 +32,7 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
@@ -43,6 +45,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collections;
+import java.util.Map;
import java.util.UUID;
public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclareBody>
@@ -219,10 +222,11 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
throws AMQException
{
final QueueRegistry registry = virtualHost.getQueueRegistry();
- AMQShortString owner = body.getExclusive() ? session.getContextKey() : null;
+ String owner = body.getExclusive() ? AMQShortString.toString(session.getContextKey()) : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
- body.getExclusive(),virtualHost, body.getArguments());
+ Map<String, Object> arguments = FieldTable.convertToMap(body.getArguments());
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), AMQShortString.toString(queueName), body.getDurable(), owner, body.getAutoDelete(),
+ body.getExclusive(),virtualHost, arguments);
if (body.getExclusive() && !body.getDurable())
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
new file mode 100644
index 0000000000..fdb009386c
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Binding.java
@@ -0,0 +1,75 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public interface Binding extends ConfiguredObject
+{
+
+ public String MATCHED_BYTES = "matchedBytes";
+ public String MATCHED_MESSAGES = "matchedMessages";
+ public String STATE_CHANGED = "stateChanged";
+
+ public static final Collection<String> AVAILABLE_STATISTICS =
+ Collections.unmodifiableCollection(
+ Arrays.asList(
+ MATCHED_BYTES,
+ MATCHED_MESSAGES,
+ STATE_CHANGED));
+
+
+ public String ARGUMENTS = "arguments";
+ public String CREATED = "created";
+ public String DURABLE = "durable";
+ public String ID = "id";
+ public String LIFETIME_POLICY = "lifetimePolicy";
+ public String NAME = "name";
+ public String STATE = "state";
+ public String TIME_TO_LIVE = "timeToLive";
+ public String UPDATED = "updated";
+ public String QUEUE = "queue";
+ public String EXCHANGE = "exchange";
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableCollection(
+ Arrays.asList(ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ EXCHANGE,
+ QUEUE,
+ ARGUMENTS)
+ );
+
+
+
+ Map<String,Object> getArguments();
+
+ void delete();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
new file mode 100644
index 0000000000..6477633a9b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfigurationChangeListener.java
@@ -0,0 +1,38 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public interface ConfigurationChangeListener
+{
+ /**
+ * Inform the listener that the passed object has changed state
+ *
+ * @param object the object whose state has changed
+ * @param oldState the state prior to the change
+ * @param newState the state after the change
+ */
+ void stateChanged(ConfiguredObject object, State oldState, State newState);
+
+ void childAdded(ConfiguredObject object, ConfiguredObject child);
+
+ void childRemoved(ConfiguredObject object, ConfiguredObject child);
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
new file mode 100644
index 0000000000..fb47a54d0a
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -0,0 +1,229 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.security.AccessControlException;
+import java.util.Collection;
+import java.util.UUID;
+
+public interface ConfiguredObject
+{
+
+ /**
+ * Get the universally unique identifier for the object
+ *
+ * @return the objects id
+ */
+ UUID getId();
+
+ /**
+ * Get the name of the object
+ *
+ * @return the name of the object
+ */
+ String getName();
+
+
+ /**
+ * Attempt to change the name of the object
+ *
+ * Request a change to the name of the object. The caller must pass in the name it believes the object currently
+ * has. If the current name differes from this expected value, then no name change will occur
+ *
+ * @param currentName the name the caller believes the object to have
+ * @param desiredName the name the caller would like the object to have
+ * @return the new name for the object
+ * @throws IllegalStateException if the name of the object may not be changed in in the current state
+ * @throws AccessControlException if the current context does not have permission to change the name
+ * @throws IllegalArgumentException if the provided name is not legal
+ * @throws NullPointerException if the desired name is null
+ */
+ String setName(String currentName, String desiredName) throws IllegalStateException,
+ AccessControlException;
+
+
+ /**
+ * Get the desired state of the object.
+ *
+ * This is the state set at the object itself, however the object
+ * may not be able attain this state if one of its ancestors is in a different state (in particular a descendant
+ * object may not be ACTIVE if all of its ancestors are not also ACTIVE).
+ *
+ * @return the desired state of the object
+ */
+ State getDesiredState();
+
+ /**
+ * Change the desired state of the object
+ *
+ * Request a change to the current state. The caller must pass in the state it believe the object to be in, if
+ * this differs from the current desired state when the object evalues the request, then no state change will occur.
+ *
+ * @param currentState the state the caller believes the object to be in
+ * @param desiredState the state the caller wishes the object to attain
+ * @return the new current state
+ * @throws IllegalStateTransitionException the requested state tranisition is invalid
+ * @throws AccessControlException the current context does not have sufficeint permissions to change the state
+ */
+ State setDesiredState(State currentState, State desiredState) throws IllegalStateTransitionException,
+ AccessControlException;
+
+ /**
+ * Get the actual state of the object.
+ *
+ * This state is derived fromt the desired state of the object itself and
+ * the actual state of its parents. If an object "desires" to be ACTIVE, but one of its parents is STOPPED, then
+ * the actual state of the object will be STOPPED
+ *
+ * @return the actual state of the object
+ */
+ State getActualState();
+
+
+ /**
+ * Add a listener which will be informed of all changes to this configuration object
+ *
+ * @param listener the listener to add
+ */
+ void addChangeListener(ConfigurationChangeListener listener);
+
+ /**
+ * Remove a change listener
+ *
+ *
+ * @param listener the listener to remove
+ * @return true iff a listener was removed
+ */
+ boolean removeChangeListener(ConfigurationChangeListener listener);
+
+ /**
+ * Get the parent of the given type for this object
+ *
+ * @param clazz the class of parent being asked for
+ * @return the objects parent
+ */
+ <T extends ConfiguredObject> T getParent(Class<T> clazz);
+
+
+ /**
+ * Returns whether the the object configuration is durably stored
+ *
+ * @return the durablity
+ */
+ boolean isDurable();
+
+ /**
+ * Sets the durability of the object
+ *
+ * @param durable true iff the caller wishes the object to store its configuration durably
+ *
+ * @throws IllegalStateException if the durability cannot be changed in the current state
+ * @throws AccessControlException if the current context does not have sufficient permission to change the durability
+ * @throws IllegalArgumentException if the object does not support the requested durability
+ */
+ void setDurable(boolean durable) throws IllegalStateException,
+ AccessControlException,
+ IllegalArgumentException;
+
+ /**
+ * Return the lifetime policy for the object
+ *
+ * @return the lifetime policy
+ */
+ LifetimePolicy getLifetimePolicy();
+
+ /**
+ * Set the lifetime policy of the object
+ *
+ * @param expected The lifetime policy the caller believes the object currently has
+ * @param desired The lifetime policy the caller desires the object to have
+ * @return the new lifetime policy
+ * @throws IllegalStateException if the lifetime policy cannot be changed in the current state
+ * @throws AccessControlException if the caller does not have permission to change the lifetime policy
+ * @throws IllegalArgumentException if the object does not support the requested lifetime policy
+ */
+ LifetimePolicy setLifetimePolicy(LifetimePolicy expected, LifetimePolicy desired) throws IllegalStateException,
+ AccessControlException,
+ IllegalArgumentException;
+
+ /**
+ * Get the time the object will live once the lifetime policy conditions are no longer fulfilled
+ *
+ * @return the time to live
+ */
+ long getTimeToLive();
+
+ /**
+ * Set the ttl value
+ *
+ * @param expected the ttl the caller believes the object currently has
+ * @param desired the ttl value the caller
+ * @return the new ttl value
+ * @throws IllegalStateException if the ttl cannot be set in the current state
+ * @throws AccessControlException if the caller does not have permission to change the ttl
+ * @throws IllegalArgumentException if the object does not support the requested ttl value
+ */
+ long setTimeToLive(long expected, long desired) throws IllegalStateException,
+ AccessControlException,
+ IllegalArgumentException;
+
+ /**
+ * Get the names of attributes that are set on this object
+ *
+ * Not that the returned collection is correct at the time the method is called, but will not reflect future
+ * additions or removals when they occur
+ *
+ * @return the collection of attribute names
+ */
+ Collection<String> getAttributeNames();
+
+
+ /**
+ * Return the value for the given attribute
+ *
+ * @param name the name of the attribute
+ * @return the value of the attribute at the object (or null if the attribute is not set
+ */
+ Object getAttribute(String name);
+
+ /**
+ * Set the value of an attribute
+ *
+ * @param name the name of the attribute to be set
+ * @param expected the value the caller believes the attribute currently has (or null if it is expected to be unset)
+ * @param desired the desired value for the attribute (or null to unset the attribute)
+ * @return the new value for the given attribute
+ * @throws IllegalStateException if the attribute cannot be set while the object is in its current state
+ * @throws AccessControlException if the caller does not have permission to alter the value of the attribute
+ * @throws IllegalArgumentException if the provided value is not valid for the given argument
+ */
+ Object setAttribute(String name, Object expected, Object desired) throws IllegalStateException,
+ AccessControlException,
+ IllegalArgumentException;
+
+
+ /**
+ * Return the Statistics holder for the ConfiguredObject
+ *
+ * @return the Statistics holder for the ConfiguredObject (or null if none exists)
+ */
+ Statistics getStatistics();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java
new file mode 100644
index 0000000000..958177e713
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Consumer.java
@@ -0,0 +1,73 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+public interface Consumer extends ConfiguredObject
+{
+ public String DISTRIBUTION_MODE = "distributionMode";
+ public String EXCLUSIVE = "exclusive";
+ public String NO_LOCAL = "noLocal";
+ public String SELECTOR = "selector";
+ public String SETTLEMENT_MODE = "settlementMode";
+ public String CREATED = "created";
+ public String DURABLE = "durable";
+ public String ID = "id";
+ public String LIFETIME_POLICY = "lifetimePolicy";
+ public String NAME = "name";
+ public String STATE = "state";
+ public String TIME_TO_LIVE = "timeToLive";
+ public String UPDATED = "updated";
+
+ public Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableCollection(
+ Arrays.asList(ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ DISTRIBUTION_MODE,
+ SETTLEMENT_MODE,
+ EXCLUSIVE,
+ NO_LOCAL,
+ SELECTOR));
+
+ public String BYTES_OUT = "bytesOut";
+ public String MESSAGES_OUT = "messagesOut";
+ public String STATE_CHANGED = "stateChanged";
+ public String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes";
+ public String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages";
+
+ public Collection<String> AVAILABLE_STATISTICS =
+ Collections.unmodifiableCollection(
+ Arrays.asList(BYTES_OUT,
+ MESSAGES_OUT,
+ STATE_CHANGED,
+ UNACKNOWLEDGED_BYTES,
+ UNACKNOWLEDGED_MESSAGES)
+ );
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
new file mode 100644
index 0000000000..e872273d05
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Exchange.java
@@ -0,0 +1,91 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Map;
+
+public interface Exchange extends ConfiguredObject
+{
+ String BINDING_COUNT = "bindingCount";
+ String BYTES_DROPPED = "bytesDropped";
+ String BYTES_IN = "bytesIn";
+ String MESSAGES_DROPPED = "messagesDropped";
+ String MESSAGES_IN = "messagesIn";
+ String PRODUCER_COUNT = "producerCount";
+ String STATE_CHANGED = "stateChanged";
+
+ public static final Collection<String> AVAILABLE_STATISTICS =
+ Collections.unmodifiableList(
+ Arrays.asList(BINDING_COUNT,
+ BYTES_DROPPED,
+ BYTES_IN,
+ MESSAGES_DROPPED,
+ MESSAGES_IN,
+ PRODUCER_COUNT,
+ STATE_CHANGED));
+
+ String CREATED = "created";
+ String DURABLE = "durable";
+ String ID = "id";
+ String LIFETIME_POLICY = "lifetimePolicy";
+ String NAME = "name";
+ String STATE = "state";
+ String TIME_TO_LIVE = "timeToLive";
+ String UPDATED = "updated";
+ String ALTERNATE_EXCHANGE = "alternateExchange";
+ String TYPE = "type";
+
+ // Attributes
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(
+ ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ ALTERNATE_EXCHANGE,
+ TYPE
+ ));
+
+ String getExchangeType();
+
+ //children
+ Collection<Binding> getBindings();
+ Collection<Publisher> getPublishers();
+
+ //operations
+ Binding createBinding(String bindingKey,
+ Queue queue,
+ Map<String,Object> bindingArguments,
+ Map<String, Object> attributes);
+
+
+ // Statistics
+
+ void delete();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java
new file mode 100644
index 0000000000..9cab5e2103
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/IllegalStateTransitionException.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public class IllegalStateTransitionException extends RuntimeException
+{
+ public IllegalStateTransitionException()
+ {
+ }
+
+ public IllegalStateTransitionException(final String message)
+ {
+ super(message);
+ }
+
+ public IllegalStateTransitionException(final String message, final Throwable cause)
+ {
+ super(message, cause);
+ }
+
+ public IllegalStateTransitionException(final Throwable cause)
+ {
+ super(cause);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
new file mode 100644
index 0000000000..c9006f4e71
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/LifetimePolicy.java
@@ -0,0 +1,27 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum LifetimePolicy
+{
+ PERMANENT,
+ AUTO_DELETE
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java
new file mode 100644
index 0000000000..cdb85d8023
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Publisher.java
@@ -0,0 +1,25 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public interface Publisher extends ConfiguredObject
+{
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
new file mode 100644
index 0000000000..7c4f0de22b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Queue.java
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+import org.apache.qpid.server.queue.QueueEntryVisitor;
+
+
+public interface Queue extends ConfiguredObject
+{
+ public static final String BINDING_COUNT = "bindingCount";
+ public static final String CONSUMER_COUNT = "consumerCount";
+ public static final String CONSUMER_COUNT_WITH_CREDIT = "consumerCountWithCredit";
+ public static final String DISCARDS_TTL_BYTES = "discardsTtlBytes";
+ public static final String DISCARDS_TTL_MESSAGES = "discardsTtlMessages";
+ public static final String PERSISTENT_DEQUEUED_BYTES = "persistentDequeuedBytes";
+ public static final String PERSISTENT_DEQUEUED_MESSAGES = "persistentDequeuedMessages";
+ public static final String PERSISTENT_ENQUEUED_BYTES = "persistentEnqueuedBytes";
+ public static final String PERSISTENT_ENQUEUED_MESSAGES = "persistentEnqueuedMessages";
+ public static final String QUEUE_DEPTH_BYTES = "queueDepthBytes";
+ public static final String QUEUE_DEPTH_MESSAGES = "queueDepthMessages";
+ public static final String STATE_CHANGED = "stateChanged";
+ public static final String TOTAL_DEQUEUED_BYTES = "totalDequeuedBytes";
+ public static final String TOTAL_DEQUEUED_MESSAGES = "totalDequeuedMessages";
+ public static final String TOTAL_ENQUEUED_BYTES = "totalEnqueuedBytes";
+ public static final String TOTAL_ENQUEUED_MESSAGES = "totalEnqueuedMessages";
+ public static final String UNACKNOWLEDGED_BYTES = "unacknowledgedBytes";
+ public static final String UNACKNOWLEDGED_MESSAGES = "unacknowledgedMessages";
+
+ public static final Collection<String> AVAILABLE_STATISTICS =
+ Collections.unmodifiableList(
+ Arrays.asList(BINDING_COUNT,
+ CONSUMER_COUNT,
+ CONSUMER_COUNT_WITH_CREDIT,
+ DISCARDS_TTL_BYTES,
+ DISCARDS_TTL_MESSAGES,
+ PERSISTENT_DEQUEUED_BYTES,
+ PERSISTENT_DEQUEUED_MESSAGES,
+ PERSISTENT_ENQUEUED_BYTES,
+ PERSISTENT_ENQUEUED_MESSAGES,
+ QUEUE_DEPTH_BYTES,
+ QUEUE_DEPTH_MESSAGES,
+ STATE_CHANGED,
+ TOTAL_DEQUEUED_BYTES,
+ TOTAL_DEQUEUED_MESSAGES,
+ TOTAL_ENQUEUED_BYTES,
+ TOTAL_ENQUEUED_MESSAGES,
+ UNACKNOWLEDGED_BYTES,
+ UNACKNOWLEDGED_MESSAGES));
+
+
+
+ public static final String ID = "id";
+ public static final String NAME = "name";
+ public static final String STATE = "state";
+ public static final String DURABLE = "durable";
+ public static final String LIFETIME_POLICY = "lifetimePolicy";
+ public static final String TIME_TO_LIVE = "timeToLive";
+ public static final String CREATED = "created";
+ public static final String UPDATED = "updated";
+
+ public static final String ALERT_REPEAT_GAP = "alertRepeatGap";
+ public static final String ALERT_THRESHOLD_MESSAGE_AGE = "alertThresholdMessageAge";
+ public static final String ALERT_THRESHOLD_MESSAGE_SIZE = "alertThresholdMessageSize";
+ public static final String ALERT_THRESHOLD_QUEUE_DEPTH_BYTES = "alertThresholdQueueDepthBytes";
+ public static final String ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES = "alertThresholdQueueDepthMessages";
+ public static final String ALTERNATE_EXCHANGE = "alternateExchange";
+ public static final String EXCLUSIVE = "exclusive";
+ public static final String MESSAGE_GROUP_KEY = "messageGroupKey";
+ public static final String MESSAGE_GROUP_DEFAULT_GROUP = "messageGroupDefaultGroup";
+ public static final String MESSAGE_GROUP_SHARED_GROUPS = "messageGroupSharedGroups";
+ public static final String LVQ_KEY = "lvqKey";
+ public static final String MAXIMUM_DELIVERY_ATTEMPTS = "maximumDeliveryAttempts";
+ public static final String NO_LOCAL = "noLocal";
+ public static final String OWNER = "owner";
+ public static final String QUEUE_FLOW_CONTROL_SIZE_BYTES = "queueFlowControlSizeBytes";
+ public static final String QUEUE_FLOW_RESUME_SIZE_BYTES = "queueFlowResumeSizeBytes";
+ public static final String QUEUE_FLOW_STOPPED = "queueFlowStopped";
+ public static final String SORT_KEY = "sortKey";
+ public static final String TYPE = "type";
+
+
+
+
+ public static final Collection<String> AVAILABLE_ATTRIBUTES =
+ Collections.unmodifiableList(
+ Arrays.asList(ID,
+ NAME,
+ STATE,
+ DURABLE,
+ LIFETIME_POLICY,
+ TIME_TO_LIVE,
+ CREATED,
+ UPDATED,
+ TYPE,
+ ALTERNATE_EXCHANGE,
+ EXCLUSIVE,
+ OWNER,
+ NO_LOCAL,
+ LVQ_KEY,
+ SORT_KEY,
+ MESSAGE_GROUP_KEY,
+ MESSAGE_GROUP_DEFAULT_GROUP,
+ MESSAGE_GROUP_SHARED_GROUPS,
+ MAXIMUM_DELIVERY_ATTEMPTS,
+ QUEUE_FLOW_CONTROL_SIZE_BYTES,
+ QUEUE_FLOW_RESUME_SIZE_BYTES,
+ QUEUE_FLOW_STOPPED,
+ ALERT_THRESHOLD_MESSAGE_AGE,
+ ALERT_THRESHOLD_MESSAGE_SIZE,
+ ALERT_THRESHOLD_QUEUE_DEPTH_BYTES,
+ ALERT_THRESHOLD_QUEUE_DEPTH_MESSAGES,
+ ALERT_REPEAT_GAP
+ ));
+
+ //children
+ Collection<Binding> getBindings();
+ Collection<Consumer> getConsumers();
+
+
+ //operations
+
+ void visit(QueueEntryVisitor visitor);
+
+ void delete();
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java
new file mode 100644
index 0000000000..a73b2c9d3e
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/State.java
@@ -0,0 +1,30 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+public enum State
+{
+ INITIALISING,
+ QUIESCED,
+ STOPPED,
+ ACTIVE,
+ DELETED
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
new file mode 100644
index 0000000000..2cb81eae82
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/Statistics.java
@@ -0,0 +1,25 @@
+package org.apache.qpid.server.model;
+
+import java.util.Collection;
+
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+public interface Statistics
+{
+ Collection<String> getStatisticNames();
+ public Object getStatistic(String name);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java
new file mode 100644
index 0000000000..d8493c6df4
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/model/UUIDGenerator.java
@@ -0,0 +1,54 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.UUID;
+
+import org.apache.qpid.exchange.ExchangeDefaults;
+
+
+public class UUIDGenerator
+{
+
+ public static UUID generateUUID()
+ {
+ return UUID.randomUUID();
+ }
+
+ public static UUID generateUUID(String objectName, String virtualHostName)
+ {
+ StringBuilder sb = new StringBuilder();
+ sb.append(virtualHostName).append(objectName);
+ return UUID.nameUUIDFromBytes(sb.toString().getBytes());
+ }
+
+ public static UUID generateExchangeUUID(String echangeName, String virtualHostName)
+ {
+ if(ExchangeDefaults.DEFAULT_EXCHANGE_NAME.asString().equals(echangeName) || echangeName.startsWith("amq.") || echangeName.startsWith("qpid."))
+ {
+ return generateUUID(echangeName, virtualHostName);
+ }
+ else
+ {
+ return generateUUID();
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index 263ed6d8cc..8d227d9677 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -62,6 +62,7 @@ import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueEntry;
@@ -199,6 +200,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
queue = AMQQueueFactory.createAMQQueueImpl(
+ UUIDGenerator.generateUUID(),
name,
isDurable,
null,
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 3f7eb18989..ef298b4731 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -37,6 +37,7 @@ import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.registry.IApplicationRegistry;
@@ -314,7 +315,8 @@ public class Session_1_0 implements SessionEventListener
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
- final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl(queueName,
+ final AMQQueue tempQueue = queue = AMQQueueFactory.createAMQQueueImpl( UUIDGenerator.generateUUID(),
+ queueName,
false, // durable
null, // owner
false, // autodelete
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
index f6bf6626a0..46c2a635b7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQPriorityQueue.java
@@ -23,19 +23,20 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
+import java.util.UUID;
public class AMQPriorityQueue extends OutOfOrderQueue
{
- protected AMQPriorityQueue(final String name,
+ protected AMQPriorityQueue(UUID id,
+ final String name,
final boolean durable,
final String owner,
final boolean autoDelete,
boolean exclusive,
final VirtualHost virtualHost,
- Map<String, Object> arguments,
- int priorities)
+ Map<String, Object> arguments, int priorities)
{
- super(name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
+ super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new PriorityQueueList.Factory(priorities), arguments);
}
public int getPriorities()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 8ff3f0148b..f2b7d7c56b 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.queue;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQSecurityException;
import org.apache.qpid.exchange.ExchangeDefaults;
@@ -30,12 +34,10 @@ import org.apache.qpid.server.configuration.ServerConfiguration;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.exchange.ExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.HashMap;
-import java.util.Map;
-
public class AMQQueueFactory
{
public static final String X_QPID_PRIORITIES = "x-qpid-priorities";
@@ -166,8 +168,13 @@ public class AMQQueueFactory
}
};
-
- /** @see #createAMQQueueImpl(String, boolean, String, boolean, boolean, VirtualHost, Map) */
+ /**
+ * Creates a new queue with a random id.
+ *
+ * @see #createAMQQueueImpl(UUID, String, boolean, String, boolean, boolean, VirtualHost, Map)
+ * @deprecated because only called from unit tests
+ * */
+ @Deprecated
public static AMQQueue createAMQQueueImpl(AMQShortString name,
boolean durable,
AMQShortString owner,
@@ -175,22 +182,28 @@ public class AMQQueueFactory
boolean exclusive,
VirtualHost virtualHost, final FieldTable arguments) throws AMQException
{
- return createAMQQueueImpl(name == null ? null : name.toString(),
+ return createAMQQueueImpl(UUIDGenerator.generateUUID(),
+ name == null ? null : name.toString(),
durable,
owner == null ? null : owner.toString(),
autoDelete,
- exclusive,
- virtualHost, FieldTable.convertToMap(arguments));
+ exclusive, virtualHost, FieldTable.convertToMap(arguments));
}
-
- public static AMQQueue createAMQQueueImpl(String queueName,
+ /**
+ * @param id the id to use. If default then one is generated from queueName. TODO check correctness of calls that pass a null value.
+ */
+ public static AMQQueue createAMQQueueImpl(UUID id,
+ String queueName,
boolean durable,
String owner,
boolean autoDelete,
- boolean exclusive,
- VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
+ boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments) throws AMQSecurityException, AMQException
{
+ if (id == null)
+ {
+ throw new IllegalArgumentException("Queue id must not be null");
+ }
if (queueName == null)
{
throw new IllegalArgumentException("Queue name must not be null");
@@ -241,19 +254,19 @@ public class AMQQueueFactory
AMQQueue q;
if(sortingKey != null)
{
- q = new SortedQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
+ q = new SortedQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, sortingKey);
}
else if(conflationKey != null)
{
- q = new ConflationQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
+ q = new ConflationQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, conflationKey);
}
else if(priorities > 1)
{
- q = new AMQPriorityQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
+ q = new AMQPriorityQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments, priorities);
}
else
{
- q = new SimpleAMQQueue(queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
+ q = new SimpleAMQQueue(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, arguments);
}
//Register the new queue
@@ -287,7 +300,7 @@ public class AMQQueueFactory
if(dlExchange == null)
{
- dlExchange = exchangeFactory.createExchange(new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
+ dlExchange = exchangeFactory.createExchange(UUIDGenerator.generateUUID(dlExchangeName, virtualHost.getName()), new AMQShortString(dlExchangeName), ExchangeDefaults.FANOUT_EXCHANGE_CLASS, true, false, 0);
exchangeRegistry.registerExchange(dlExchange);
@@ -309,7 +322,7 @@ public class AMQQueueFactory
args.put(X_QPID_DLQ_ENABLED, false);
args.put(X_QPID_MAXIMUM_DELIVERY_COUNT, 0);
- dlQueue = createAMQQueueImpl(dlQueueName, true, owner, false, exclusive, virtualHost, args);
+ dlQueue = createAMQQueueImpl(UUIDGenerator.generateUUID(dlQueueName, virtualHost.getName()), dlQueueName, true, owner, false, exclusive, virtualHost, args);
//enter the dlq in the persistent store
virtualHost.getMessageStore().createQueue(dlQueue, FieldTable.convertToFieldTable(args));
@@ -364,7 +377,10 @@ public class AMQQueueFactory
arguments.put(X_QPID_DLQ_ENABLED, true);
}
- AMQQueue q = createAMQQueueImpl(queueName, durable, owner, autodelete, exclusive, host, arguments);
+ // we need queues that are defined in config to have deterministic ids.
+ UUID id = UUIDGenerator.generateUUID(queueName, host.getName());
+
+ AMQQueue q = createAMQQueueImpl(id, queueName, durable, owner, autodelete, exclusive, host, arguments);
q.configure(config);
return q;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
index 2c645cc555..c2813bb7a5 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/ConflationQueue.java
@@ -21,22 +21,23 @@
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-
import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.server.virtualhost.VirtualHost;
public class ConflationQueue extends SimpleAMQQueue
{
- protected ConflationQueue(String name,
+ protected ConflationQueue(UUID id,
+ String name,
boolean durable,
String owner,
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
- Map<String, Object> args,
- String conflationKey)
+ Map<String, Object> args, String conflationKey)
{
- super(name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
+ super(id, name, durable, owner, autoDelete, exclusive, virtualHost, new ConflationQueueList.Factory(conflationKey), args);
}
public String getConflationKey()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index 801fe55939..2493974d45 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -27,6 +27,7 @@ import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -95,4 +96,18 @@ public class DefaultQueueRegistry implements QueueRegistry
}
_queueMap.clear();
}
+
+ @Override
+ public synchronized AMQQueue getQueue(UUID queueId)
+ {
+ Collection<AMQQueue> queues = _queueMap.values();
+ for (AMQQueue queue : queues)
+ {
+ if (queue.getId().equals(queueId))
+ {
+ return queue;
+ }
+ }
+ return null;
+ }
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
index 53121fc031..89976ca16e 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java
@@ -5,15 +5,16 @@ import org.apache.qpid.server.subscription.SubscriptionList;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
+import java.util.UUID;
public abstract class OutOfOrderQueue extends SimpleAMQQueue
{
- protected OutOfOrderQueue(String name, boolean durable, String owner,
- boolean autoDelete, boolean exclusive, VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ protected OutOfOrderQueue(UUID id, String name, boolean durable,
+ String owner, boolean autoDelete, boolean exclusive,
+ VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
{
- super(name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+ super(id, name, durable, owner, autoDelete, exclusive, virtualHost, entryListFactory, arguments);
}
@Override
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
new file mode 100644
index 0000000000..1578d21321
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryVisitor.java
@@ -0,0 +1,22 @@
+package org.apache.qpid.server.queue;
+
+/**
+* Licensed to the Apache Software Foundation (ASF) under one or more
+* contributor license agreements. See the NOTICE file distributed with
+* this work for additional information regarding copyright ownership.
+* The ASF licenses this file to You under the Apache License, Version 2.0
+* (the "License"); you may not use this file except in compliance with
+* the License. You may obtain a copy of the License at
+* <p/>
+* http://www.apache.org/licenses/LICENSE-2.0
+* <p/>
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+*/
+public interface QueueEntryVisitor
+{
+ boolean visit(QueueEntry entry);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 1ffc0a3560..72a54c9889 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -24,6 +24,7 @@ import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Collection;
+import java.util.UUID;
public interface QueueRegistry
{
@@ -42,4 +43,6 @@ public interface QueueRegistry
AMQQueue getQueue(String queue);
void stopAllAndUnregisterMBeans();
+
+ AMQQueue getQueue(UUID queueId);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index e6f059a875..d7eb304c92 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -191,29 +191,29 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
private int _maximumDeliveryCount = ApplicationRegistry.getInstance().getConfiguration().getMaxDeliveryCount();
private final MessageGroupManager _messageGroupManager;
- protected SimpleAMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
+ protected SimpleAMQQueue(UUID id, AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String,Object> arguments)
{
- this(name, durable, owner, autoDelete, exclusive, virtualHost,new SimpleQueueEntryList.Factory(), arguments);
+ this(id, name, durable, owner, autoDelete, exclusive,virtualHost, new SimpleQueueEntryList.Factory(), arguments);
}
- public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
+ public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, Map<String, Object> arguments)
{
- this(queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
+ this(id, queueName, durable, owner, autoDelete, exclusive, virtualHost, new SimpleQueueEntryList.Factory(), arguments);
}
- public SimpleAMQQueue(String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
+ public SimpleAMQQueue(UUID id, String queueName, boolean durable, String owner, boolean autoDelete, boolean exclusive, VirtualHost virtualHost, QueueEntryListFactory entryListFactory, Map<String, Object> arguments)
{
- this(queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
+ this(id, queueName == null ? null : new AMQShortString(queueName), durable, owner == null ? null : new AMQShortString(owner), autoDelete, exclusive, virtualHost, entryListFactory, arguments);
}
- protected SimpleAMQQueue(AMQShortString name,
+ protected SimpleAMQQueue(UUID id,
+ AMQShortString name,
boolean durable,
AMQShortString owner,
boolean autoDelete,
boolean exclusive,
VirtualHost virtualHost,
- QueueEntryListFactory entryListFactory,
- Map<String,Object> arguments)
+ QueueEntryListFactory entryListFactory, Map<String,Object> arguments)
{
if (name == null)
@@ -236,7 +236,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
_entries = entryListFactory.createQueueEntryList(this);
_arguments = arguments;
- _id = virtualHost.getConfigStore().createId();
+ _id = id;
_asyncDelivery = ReferenceCountingExecutorService.getInstance().acquireExecutorService();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
index 446f57b142..b3566df0c4 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SortedQueue.java
@@ -24,6 +24,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
import java.util.Map;
+import java.util.UUID;
public class SortedQueue extends OutOfOrderQueue
{
@@ -33,12 +34,12 @@ public class SortedQueue extends OutOfOrderQueue
private final Object _sortedQueueLock = new Object();
private final String _sortedPropertyName;
- protected SortedQueue(final String name, final boolean durable,
- final String owner, final boolean autoDelete, final boolean exclusive,
- final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
+ protected SortedQueue(UUID id, final String name,
+ final boolean durable, final String owner, final boolean autoDelete,
+ final boolean exclusive, final VirtualHost virtualHost, Map<String, Object> arguments, String sortedPropertyName)
{
- super(name, durable, owner, autoDelete, exclusive, virtualHost,
- new SortedQueueEntryListFactory(sortedPropertyName), arguments);
+ super(id, name, durable, owner, autoDelete, exclusive,
+ virtualHost, new SortedQueueEntryListFactory(sortedPropertyName), arguments);
this._sortedPropertyName = sortedPropertyName;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
index 73f127e097..1307b1dbd4 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfigurationRecoveryHandler.java
@@ -32,19 +32,19 @@ public interface ConfigurationRecoveryHandler
public static interface QueueRecoveryHandler
{
- void queue(String queueName, String owner, boolean exclusive, FieldTable arguments);
+ void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments);
ExchangeRecoveryHandler completeQueueRecovery();
}
public static interface ExchangeRecoveryHandler
{
- void exchange(String exchangeName, String type, boolean autoDelete);
+ void exchange(UUID id, String exchangeName, String type, boolean autoDelete);
BindingRecoveryHandler completeExchangeRecovery();
}
public static interface BindingRecoveryHandler
{
- void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf);
+ void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingName, ByteBuffer buf);
BrokerLinkRecoveryHandler completeBindingRecovery();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
new file mode 100644
index 0000000000..1a67fdf540
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectHelper.java
@@ -0,0 +1,183 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.util.MapJsonSerializer;
+
+public class ConfiguredObjectHelper
+{
+ /**
+ * Name of queue attribute to store queue creation arguments.
+ * <p>
+ * This attribute is not defined yet on Queue configured object interface.
+ */
+ private static final String QUEUE_ARGUMENTS = "ARGUMENTS";
+
+ private MapJsonSerializer _serializer = new MapJsonSerializer();
+
+ public void loadQueue(ConfiguredObjectRecord configuredObject, QueueRecoveryHandler qrh)
+ {
+ if (Queue.class.getName().equals(configuredObject.getType()))
+ {
+ Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
+ String queueName = (String) attributeMap.get(Queue.NAME);
+ String owner = (String) attributeMap.get(Queue.OWNER);
+ boolean exclusive = (Boolean) attributeMap.get(Queue.EXCLUSIVE);
+ @SuppressWarnings("unchecked")
+ Map<String, Object> queueArgumentsMap = (Map<String, Object>) attributeMap.get(QUEUE_ARGUMENTS);
+ FieldTable arguments = null;
+ if (queueArgumentsMap != null)
+ {
+ arguments = FieldTable.convertToFieldTable(queueArgumentsMap);
+ }
+ qrh.queue(configuredObject.getId(), queueName, owner, exclusive, arguments);
+ }
+ }
+
+ public ConfiguredObjectRecord updateQueueConfiguredObject(final AMQQueue queue, ConfiguredObjectRecord queueRecord)
+ {
+ Map<String, Object> attributesMap = _serializer.deserialize(queueRecord.getAttributes());
+ attributesMap.put(Queue.NAME, queue.getName());
+ attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ String newJson = _serializer.serialize(attributesMap);
+ ConfiguredObjectRecord newQueueRecord = new ConfiguredObjectRecord(queue.getId(), queueRecord.getType(), newJson);
+ return newQueueRecord;
+ }
+
+ public ConfiguredObjectRecord createQueueConfiguredObject(AMQQueue queue, FieldTable arguments)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Queue.NAME, queue.getName());
+ attributesMap.put(Queue.OWNER, AMQShortString.toString(queue.getOwner()));
+ attributesMap.put(Queue.EXCLUSIVE, queue.isExclusive());
+ if (arguments != null)
+ {
+ attributesMap.put(QUEUE_ARGUMENTS, FieldTable.convertToMap(arguments));
+ }
+ String json = _serializer.serialize(attributesMap);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(queue.getId(), Queue.class.getName(), json);
+ return configuredObject;
+ }
+
+ public void loadExchange(ConfiguredObjectRecord configuredObject, ExchangeRecoveryHandler erh)
+ {
+ if (Exchange.class.getName().equals(configuredObject.getType()))
+ {
+ Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
+ String exchangeName = (String) attributeMap.get(Exchange.NAME);
+ String exchangeType = (String) attributeMap.get(Exchange.TYPE);
+ String lifeTimePolicy = (String) attributeMap.get(Exchange.LIFETIME_POLICY);
+ boolean autoDelete = lifeTimePolicy == null
+ || LifetimePolicy.valueOf(lifeTimePolicy) == LifetimePolicy.AUTO_DELETE;
+ erh.exchange(configuredObject.getId(), exchangeName, exchangeType, autoDelete);
+ }
+ }
+
+ public ConfiguredObjectRecord createExchangeConfiguredObject(org.apache.qpid.server.exchange.Exchange exchange)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Exchange.NAME, exchange.getName());
+ attributesMap.put(Exchange.TYPE, AMQShortString.toString(exchange.getTypeShortString()));
+ attributesMap.put(Exchange.LIFETIME_POLICY, exchange.isAutoDelete() ? LifetimePolicy.AUTO_DELETE.name()
+ : LifetimePolicy.PERMANENT.name());
+ String json = _serializer.serialize(attributesMap);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(exchange.getId(), Exchange.class.getName(), json);
+ return configuredObject;
+ }
+
+ public void loadQueueBinding(ConfiguredObjectRecord configuredObject, BindingRecoveryHandler brh)
+ {
+ if (Binding.class.getName().equals(configuredObject.getType()))
+ {
+ Map<String, Object> attributeMap = _serializer.deserialize(configuredObject.getAttributes());
+ UUID exchangeId = UUID.fromString((String)attributeMap.get(Binding.EXCHANGE));
+ UUID queueId = UUID.fromString((String) attributeMap.get(Binding.QUEUE));
+ String bindingName = (String) attributeMap.get(Binding.NAME);
+
+ @SuppressWarnings("unchecked")
+ Map<String, Object> bindingArgumentsMap = (Map<String, Object>) attributeMap.get(Binding.ARGUMENTS);
+ FieldTable arguments = null;
+ if (bindingArgumentsMap != null)
+ {
+ arguments = FieldTable.convertToFieldTable(bindingArgumentsMap);
+ }
+ ByteBuffer argumentsBB = (arguments == null ? null : ByteBuffer.wrap(arguments.getDataAsBytes()));
+
+ brh.binding(configuredObject.getId(), exchangeId, queueId, bindingName, argumentsBB);
+ }
+ }
+
+ public ConfiguredObjectRecord createBindingConfiguredObject(org.apache.qpid.server.binding.Binding binding)
+ {
+ Map<String, Object> attributesMap = new HashMap<String, Object>();
+ attributesMap.put(Binding.NAME, binding.getBindingKey());
+ attributesMap.put(Binding.EXCHANGE, binding.getExchange().getId());
+ attributesMap.put(Binding.QUEUE, binding.getQueue().getId());
+ Map<String, Object> arguments = binding.getArguments();
+ if (arguments != null)
+ {
+ attributesMap.put(Binding.ARGUMENTS, arguments);
+ }
+ String json = _serializer.serialize(attributesMap);
+ ConfiguredObjectRecord configuredObject = new ConfiguredObjectRecord(binding.getId(), Binding.class.getName(), json);
+ return configuredObject;
+ }
+
+ public void recoverQueues(QueueRecoveryHandler qrh, List<ConfiguredObjectRecord> configuredObjects)
+ {
+ for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
+ {
+ loadQueue(configuredObjectRecord, qrh);
+ }
+ }
+
+ public void recoverExchanges(ExchangeRecoveryHandler erh, List<ConfiguredObjectRecord> configuredObjects)
+ {
+ for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
+ {
+ loadExchange(configuredObjectRecord, erh);
+ }
+ }
+
+ public void recoverBindings(BindingRecoveryHandler brh, List<ConfiguredObjectRecord> configuredObjects)
+ {
+ for (ConfiguredObjectRecord configuredObjectRecord : configuredObjects)
+ {
+ loadQueueBinding(configuredObjectRecord, brh);
+ }
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
new file mode 100644
index 0000000000..95e1713d78
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/ConfiguredObjectRecord.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.UUID;
+
+public class ConfiguredObjectRecord
+{
+ private UUID _id;
+ private String _type;
+ private String _attributes;
+
+ public ConfiguredObjectRecord(UUID id, String type, String attributes)
+ {
+ super();
+ _id = id;
+ _type = type;
+ _attributes = attributes;
+ }
+
+ public UUID getId()
+ {
+ return _id;
+ }
+
+ public void setId(UUID id)
+ {
+ _id = id;
+ }
+
+ public String getType()
+ {
+ return _type;
+ }
+
+ public String getAttributes()
+ {
+ return _attributes;
+ }
+
+ @Override
+ public String toString()
+ {
+ return "ConfiguredObjectRecord [id=" + _id + ", type=" + _type + ", attributes=" + _attributes + "]";
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
index 13c24e624e..655887e5c2 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/DurableConfigurationStore.java
@@ -23,8 +23,8 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
@@ -69,28 +69,22 @@ public interface DurableConfigurationStore
void removeExchange(Exchange exchange) throws AMQStoreException;
/**
- * Binds the specified queue to an exchange with a routing key.
+ * Store the queue binding.
*
- * @param exchange The exchange to bind to.
- * @param routingKey The routing key to bind by.
- * @param queue The queue to bind.
- * @param args Additional parameters.
+ * @param binding queue binding
*
* @throws AMQStoreException if the operation fails for any reason.
*/
- void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+ void bindQueue(Binding binding) throws AMQStoreException;
/**
- * Unbinds the specified from an exchange under a particular routing key.
+ * Removes queue binding
*
- * @param exchange The exchange to unbind from.
- * @param routingKey The routing key to unbind.
- * @param queue The queue to unbind.
- * @param args Additional parameters.
+ * @param binding queue binding to remove
*
* @throws AMQStoreException If the operation fails for any reason.
*/
- void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException;
+ void unbindQueue(Binding binding) throws AMQStoreException;
/**
* Makes the specified queue persistent.
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
index 77df6c5abf..34c7d2d933 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/NullMessageStore.java
@@ -21,8 +21,8 @@ package org.apache.qpid.server.store;
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQStoreException;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
@@ -48,12 +48,12 @@ public class NullMessageStore implements MessageStore
}
@Override
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ public void bindQueue(Binding binding) throws AMQStoreException
{
}
@Override
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQStoreException
+ public void unbindQueue(Binding binding) throws AMQStoreException
{
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
index b92d5f3e9b..bd4da648f9 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogRecoveryHandler.java
@@ -20,15 +20,17 @@
*/
package org.apache.qpid.server.store;
+import java.util.UUID;
+
public interface TransactionLogRecoveryHandler
{
QueueEntryRecoveryHandler begin(MessageStore log);
public static interface QueueEntryRecoveryHandler
{
- void queueEntry(String queuename, long messageId);
-
DtxRecordRecoveryHandler completeQueueEntryRecovery();
+
+ void queueEntry(UUID queueId, long messageId);
}
public static interface DtxRecordRecoveryHandler
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
index 0d81dd151d..576dca847d 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/TransactionLogResource.java
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.server.store;
+import java.util.UUID;
+
public interface TransactionLogResource
{
- public String getResourceName();
+ public UUID getId();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
index 07d8bb97f8..0371cdcfcb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/derby/DerbyMessageStore.java
@@ -29,6 +29,7 @@ import java.io.File;
import java.io.IOException;
import java.lang.ref.SoftReference;
import java.nio.ByteBuffer;
+import java.nio.charset.Charset;
import java.sql.Blob;
import java.sql.Connection;
import java.sql.Driver;
@@ -51,12 +52,15 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.federation.Bridge;
import org.apache.qpid.server.federation.BrokerLink;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.ConfiguredObjectHelper;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.EventManager;
@@ -89,12 +93,9 @@ public class DerbyMessageStore implements MessageStore
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
- private static final String EXCHANGE_TABLE_NAME = "QPID_EXCHANGE";
- private static final String QUEUE_TABLE_NAME = "QPID_QUEUE";
- private static final String BINDINGS_TABLE_NAME = "QPID_BINDINGS";
- private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRY";
+ private static final String QUEUE_ENTRY_TABLE_NAME = "QPID_QUEUE_ENTRIES";
- private static final String META_DATA_TABLE_NAME = "QPID_META_DATA";
+ private static final String META_DATA_TABLE_NAME = "QPID_MESSAGE_METADATA";
private static final String MESSAGE_CONTENT_TABLE_NAME = "QPID_MESSAGE_CONTENT";
private static final String LINKS_TABLE_NAME = "QPID_LINKS";
@@ -103,7 +104,9 @@ public class DerbyMessageStore implements MessageStore
private static final String XID_TABLE_NAME = "QPID_XIDS";
private static final String XID_ACTIONS_TABLE_NAME = "QPID_XID_ACTIONS";
- private static final int DB_VERSION = 3;
+ private static final String CONFIGURED_OBJECTS_TABLE_NAME = "QPID_CONFIGURED_OBJECTS";
+
+ private static final int DB_VERSION = 6;
@@ -119,38 +122,23 @@ public class DerbyMessageStore implements MessageStore
private static final String CREATE_DB_VERSION_TABLE = "CREATE TABLE "+DB_VERSION_TABLE_NAME+" ( version int not null )";
private static final String INSERT_INTO_DB_VERSION = "INSERT INTO "+DB_VERSION_TABLE_NAME+" ( version ) VALUES ( ? )";
- private static final String CREATE_EXCHANGE_TABLE = "CREATE TABLE "+EXCHANGE_TABLE_NAME+" ( name varchar(255) not null, type varchar(255) not null, autodelete SMALLINT not null, PRIMARY KEY ( name ) )";
- private static final String CREATE_QUEUE_TABLE = "CREATE TABLE "+QUEUE_TABLE_NAME+" ( name varchar(255) not null, owner varchar(255), exclusive SMALLINT not null, arguments blob, PRIMARY KEY ( name ))";
- private static final String CREATE_BINDINGS_TABLE = "CREATE TABLE "+BINDINGS_TABLE_NAME+" ( exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255) not null, arguments blob , PRIMARY KEY ( exchange_name, queue_name, binding_key ) )";
- private static final String SELECT_FROM_QUEUE = "SELECT name, owner, exclusive, arguments FROM " + QUEUE_TABLE_NAME;
- private static final String FIND_QUEUE = "SELECT name, owner FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
- private static final String UPDATE_QUEUE_EXCLUSIVITY = "UPDATE " + QUEUE_TABLE_NAME + " SET exclusive = ? WHERE name = ?";
- private static final String SELECT_FROM_EXCHANGE = "SELECT name, type, autodelete FROM " + EXCHANGE_TABLE_NAME;
- private static final String SELECT_FROM_BINDINGS =
- "SELECT exchange_name, queue_name, binding_key, arguments FROM " + BINDINGS_TABLE_NAME + " ORDER BY exchange_name";
- private static final String FIND_BINDING =
- "SELECT * FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ? ";
- private static final String INSERT_INTO_EXCHANGE = "INSERT INTO " + EXCHANGE_TABLE_NAME + " ( name, type, autodelete ) VALUES ( ?, ?, ? )";
- private static final String DELETE_FROM_EXCHANGE = "DELETE FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
- private static final String FIND_EXCHANGE = "SELECT name FROM " + EXCHANGE_TABLE_NAME + " WHERE name = ?";
- private static final String INSERT_INTO_BINDINGS = "INSERT INTO " + BINDINGS_TABLE_NAME + " ( exchange_name, queue_name, binding_key, arguments ) values ( ?, ?, ?, ? )";
- private static final String DELETE_FROM_BINDINGS = "DELETE FROM " + BINDINGS_TABLE_NAME + " WHERE exchange_name = ? AND queue_name = ? AND binding_key = ?";
- private static final String INSERT_INTO_QUEUE = "INSERT INTO " + QUEUE_TABLE_NAME + " (name, owner, exclusive, arguments) VALUES (?, ?, ?, ?)";
- private static final String DELETE_FROM_QUEUE = "DELETE FROM " + QUEUE_TABLE_NAME + " WHERE name = ?";
-
- private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_name varchar(255) not null, message_id bigint not null, PRIMARY KEY (queue_name, message_id) )";
- private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_name, message_id) values (?,?)";
- private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_name = ? AND message_id =?";
- private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_name, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_name, message_id";
-
-
- private static final String CREATE_META_DATA_TABLE = "CREATE TABLE "+META_DATA_TABLE_NAME+" ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
- private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE "+MESSAGE_CONTENT_TABLE_NAME+" ( message_id bigint not null, offset int not null, last_byte int not null, content blob , PRIMARY KEY (message_id, offset) )";
-
- private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME + "( message_id, offset, last_byte, content ) values (?, ?, ?, ?)";
- private static final String SELECT_FROM_MESSAGE_CONTENT =
- "SELECT offset, content FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ? AND last_byte > ? AND offset < ? ORDER BY message_id, offset";
- private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME + " WHERE message_id = ?";
+ private static final String CREATE_QUEUE_ENTRY_TABLE = "CREATE TABLE "+QUEUE_ENTRY_TABLE_NAME+" ( queue_id varchar(36) not null, message_id bigint not null, PRIMARY KEY (queue_id, message_id) )";
+ private static final String INSERT_INTO_QUEUE_ENTRY = "INSERT INTO " + QUEUE_ENTRY_TABLE_NAME + " (queue_id, message_id) values (?,?)";
+ private static final String DELETE_FROM_QUEUE_ENTRY = "DELETE FROM " + QUEUE_ENTRY_TABLE_NAME + " WHERE queue_id = ? AND message_id =?";
+ private static final String SELECT_FROM_QUEUE_ENTRY = "SELECT queue_id, message_id FROM " + QUEUE_ENTRY_TABLE_NAME + " ORDER BY queue_id, message_id";
+
+
+ private static final String CREATE_META_DATA_TABLE = "CREATE TABLE " + META_DATA_TABLE_NAME
+ + " ( message_id bigint not null, meta_data blob, PRIMARY KEY ( message_id ) )";
+ private static final String CREATE_MESSAGE_CONTENT_TABLE = "CREATE TABLE " + MESSAGE_CONTENT_TABLE_NAME
+ + " ( message_id bigint not null, content blob , PRIMARY KEY (message_id) )";
+
+ private static final String INSERT_INTO_MESSAGE_CONTENT = "INSERT INTO " + MESSAGE_CONTENT_TABLE_NAME
+ + "( message_id, content ) values (?, ?)";
+ private static final String SELECT_FROM_MESSAGE_CONTENT = "SELECT content FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
+ private static final String DELETE_FROM_MESSAGE_CONTENT = "DELETE FROM " + MESSAGE_CONTENT_TABLE_NAME
+ + " WHERE message_id = ?";
private static final String INSERT_INTO_META_DATA = "INSERT INTO " + META_DATA_TABLE_NAME + "( message_id , meta_data ) values (?, ?)";;
private static final String SELECT_FROM_META_DATA =
@@ -214,18 +202,32 @@ public class DerbyMessageStore implements MessageStore
private static final String CREATE_XID_ACTIONS_TABLE =
"CREATE TABLE "+XID_ACTIONS_TABLE_NAME+" ( format bigint not null,"
+ " global_id varchar(64) for bit data not null, branch_id varchar(64) for bit data not null, " +
- "action_type char not null, queue_name varchar(255) not null, message_id bigint not null" +
+ "action_type char not null, queue_id varchar(36) not null, message_id bigint not null" +
", PRIMARY KEY ( " +
- "format, global_id, branch_id, action_type, queue_name, message_id))";
+ "format, global_id, branch_id, action_type, queue_id, message_id))";
private static final String INSERT_INTO_XID_ACTIONS =
"INSERT INTO "+XID_ACTIONS_TABLE_NAME+" ( format, global_id, branch_id, action_type, " +
- "queue_name, message_id ) values (?,?,?,?,?,?) ";
+ "queue_id, message_id ) values (?,?,?,?,?,?) ";
private static final String DELETE_FROM_XID_ACTIONS = "DELETE FROM " + XID_ACTIONS_TABLE_NAME
+ " WHERE format = ? and global_id = ? and branch_id = ?";
private static final String SELECT_ALL_FROM_XID_ACTIONS =
- "SELECT action_type, queue_name, message_id FROM " + XID_ACTIONS_TABLE_NAME +
+ "SELECT action_type, queue_id, message_id FROM " + XID_ACTIONS_TABLE_NAME +
" WHERE format = ? and global_id = ? and branch_id = ?";
+ private static final String CREATE_CONFIGURED_OBJECTS_TABLE = "CREATE TABLE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id VARCHAR(36) not null, object_type varchar(255), attributes blob, PRIMARY KEY (id))";
+ private static final String INSERT_INTO_CONFIGURED_OBJECTS = "INSERT INTO " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " ( id, object_type, attributes) VALUES (?,?,?)";
+ private static final String UPDATE_CONFIGURED_OBJECTS = "UPDATE " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " set object_type =?, attributes = ? where id = ?";
+ private static final String DELETE_FROM_CONFIGURED_OBJECTS = "DELETE FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String FIND_CONFIGURED_OBJECT = "SELECT object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME
+ + " where id = ?";
+ private static final String SELECT_FROM_CONFIGURED_OBJECTS = "SELECT id, object_type, attributes FROM " + CONFIGURED_OBJECTS_TABLE_NAME;
+
+ private final Charset UTF8_CHARSET = Charset.forName("UTF-8");
+
private static final String DERBY_SINGLE_DB_SHUTDOWN_CODE = "08006";
private final StateManager _stateManager;
@@ -244,6 +246,8 @@ public class DerbyMessageStore implements MessageStore
_stateManager = new StateManager(_eventManager);
}
+ private ConfiguredObjectHelper _configuredObjectHelper = new ConfiguredObjectHelper();
+
@Override
public void configureConfigStore(String name,
ConfigurationRecoveryHandler configRecoveryHandler,
@@ -323,9 +327,7 @@ public class DerbyMessageStore implements MessageStore
Connection conn = newAutoCommitConnection();
createVersionTable(conn);
- createExchangeTable(conn);
- createQueueTable(conn);
- createBindingsTable(conn);
+ createConfiguredObjectsTable(conn);
createQueueEntryTable(conn);
createMetaDataTable(conn);
createMessageContentTable(conn);
@@ -366,15 +368,14 @@ public class DerbyMessageStore implements MessageStore
}
-
- private void createExchangeTable(final Connection conn) throws SQLException
+ private void createConfiguredObjectsTable(final Connection conn) throws SQLException
{
- if(!tableExists(EXCHANGE_TABLE_NAME, conn))
+ if(!tableExists(CONFIGURED_OBJECTS_TABLE_NAME, conn))
{
Statement stmt = conn.createStatement();
try
{
- stmt.execute(CREATE_EXCHANGE_TABLE);
+ stmt.execute(CREATE_CONFIGURED_OBJECTS_TABLE);
}
finally
{
@@ -383,39 +384,6 @@ public class DerbyMessageStore implements MessageStore
}
}
- private void createQueueTable(final Connection conn) throws SQLException
- {
- if(!tableExists(QUEUE_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_QUEUE_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
- }
-
- private void createBindingsTable(final Connection conn) throws SQLException
- {
- if(!tableExists(BINDINGS_TABLE_NAME, conn))
- {
- Statement stmt = conn.createStatement();
- try
- {
- stmt.execute(CREATE_BINDINGS_TABLE);
- }
- finally
- {
- stmt.close();
- }
- }
-
- }
-
private void createQueueEntryTable(final Connection conn) throws SQLException
{
if(!tableExists(QUEUE_ENTRY_TABLE_NAME, conn))
@@ -433,7 +401,7 @@ public class DerbyMessageStore implements MessageStore
}
- private void createMetaDataTable(final Connection conn) throws SQLException
+ private void createMetaDataTable(final Connection conn) throws SQLException
{
if(!tableExists(META_DATA_TABLE_NAME, conn))
{
@@ -558,26 +526,25 @@ public class DerbyMessageStore implements MessageStore
private void recoverConfiguration(ConfigurationRecoveryHandler recoveryHandler) throws AMQException
{
-
try
{
+ List<ConfiguredObjectRecord> configuredObjects = loadConfiguredObjects();
ConfigurationRecoveryHandler.QueueRecoveryHandler qrh = recoveryHandler.begin(this);
- recoverQueues(qrh);
+ _configuredObjectHelper.recoverQueues(qrh, configuredObjects);
ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh = qrh.completeQueueRecovery();
- List<String> exchanges = loadExchanges(erh);
+ _configuredObjectHelper.recoverExchanges(erh, configuredObjects);
+
ConfigurationRecoveryHandler.BindingRecoveryHandler brh = erh.completeExchangeRecovery();
- recoverBindings(brh, exchanges);
+ _configuredObjectHelper.recoverBindings(brh, configuredObjects);
+
ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh = brh.completeBindingRecovery();
recoverBrokerLinks(lrh);
}
catch (SQLException e)
{
-
throw new AMQStoreException("Error recovering persistent state: " + e.getMessage(), e);
}
-
-
}
private void recoverBrokerLinks(final ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler lrh)
@@ -718,176 +685,6 @@ public class DerbyMessageStore implements MessageStore
}
- private void recoverQueues(ConfigurationRecoveryHandler.QueueRecoveryHandler qrh) throws SQLException
- {
- Connection conn = newAutoCommitConnection();
- try
- {
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_QUEUE);
- try
- {
-
- while(rs.next())
- {
- String queueName = rs.getString(1);
- _logger.debug("Got queue " + queueName);
- String owner = rs.getString(2);
- boolean exclusive = rs.getBoolean(3);
- Blob argumentsAsBlob = rs.getBlob(4);
-
- byte[] dataAsBytes = argumentsAsBlob.getBytes(1,(int) argumentsAsBlob.length());
- FieldTable arguments;
- if(dataAsBytes.length > 0)
- {
-
- try
- {
- arguments = new FieldTable(new DataInputStream(new ByteArrayInputStream(dataAsBytes)),dataAsBytes.length);
- }
- catch (IOException e)
- {
- throw new RuntimeException("IO Exception should not be thrown",e);
- }
- }
- else
- {
- arguments = null;
- }
-
- qrh.queue(queueName, owner, exclusive, arguments);
-
- }
-
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
-
-
- private List<String> loadExchanges(ConfigurationRecoveryHandler.ExchangeRecoveryHandler erh) throws SQLException
- {
-
- List<String> exchanges = new ArrayList<String>();
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- Statement stmt = conn.createStatement();
- try
- {
- ResultSet rs = stmt.executeQuery(SELECT_FROM_EXCHANGE);
- try
- {
- while(rs.next())
- {
- String exchangeName = rs.getString(1);
- String type = rs.getString(2);
- boolean autoDelete = rs.getShort(3) != 0;
-
- exchanges.add(exchangeName);
-
- erh.exchange(exchangeName, type, autoDelete);
-
- }
- return exchanges;
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
-
- }
-
- private void recoverBindings(ConfigurationRecoveryHandler.BindingRecoveryHandler brh, List<String> exchanges) throws SQLException
- {
- _logger.info("Recovering bindings...");
-
- Connection conn = null;
- try
- {
- conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_BINDINGS);
-
- try
- {
- ResultSet rs = stmt.executeQuery();
-
- try
- {
-
- while(rs.next())
- {
- String exchangeName = rs.getString(1);
- String queueName = rs.getString(2);
- String bindingKey = rs.getString(3);
- Blob arguments = rs.getBlob(4);
- java.nio.ByteBuffer buf;
-
- if(arguments != null && arguments.length() != 0)
- {
- byte[] argumentBytes = arguments.getBytes(1, (int) arguments.length());
- buf = java.nio.ByteBuffer.wrap(argumentBytes);
- }
- else
- {
- buf = null;
- }
-
- brh.binding(exchangeName, queueName, bindingKey, buf);
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- if(conn != null)
- {
- conn.close();
- }
- }
- }
-
-
-
@Override
public void close() throws Exception
{
@@ -999,60 +796,8 @@ public class DerbyMessageStore implements MessageStore
{
if (_stateManager.isInState(State.ACTIVE))
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- try
- {
-
-
- PreparedStatement stmt = conn.prepareStatement(FIND_EXCHANGE);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this exchange
- if (!rs.next())
- {
-
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_EXCHANGE);
- try
- {
- insertStmt.setString(1, exchange.getName().toString());
- insertStmt.setString(2, exchange.getTypeShortString().asString());
- insertStmt.setShort(3, exchange.isAutoDelete() ? (short) 1 : (short) 0);
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
-
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing Exchange with name " + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createExchangeConfiguredObject(exchange);
+ insertConfiguredObject(configuredObject);
}
}
@@ -1060,150 +805,32 @@ public class DerbyMessageStore implements MessageStore
@Override
public void removeExchange(Exchange exchange) throws AMQStoreException
{
-
- try
+ int results = removeConfiguredObject(exchange.getId());
+ if (results == 0)
{
- Connection conn = newAutoCommitConnection();
- try
- {
- PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_EXCHANGE);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString());
- int results = stmt.executeUpdate();
- stmt.close();
- if(results == 0)
- {
- throw new AMQStoreException("Exchange " + exchange.getNameShortString() + " not found");
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting Exchange with name " + exchange.getNameShortString() + " from database: " + e.getMessage(), e);
+ throw new AMQStoreException("Exchange " + exchange.getName() + " with id " + exchange.getId() + " not found");
}
}
@Override
- public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ public void bindQueue(Binding binding)
throws AMQStoreException
{
if (_stateManager.isInState(State.ACTIVE))
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- try
- {
-
- PreparedStatement stmt = conn.prepareStatement(FIND_BINDING);
- try
- {
- stmt.setString(1, exchange.getNameShortString().toString() );
- stmt.setString(2, queue.getNameShortString().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
- ResultSet rs = stmt.executeQuery();
- try
- {
- // If this binding is not already in the store then create it.
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_BINDINGS);
- try
- {
- insertStmt.setString(1, exchange.getNameShortString().toString() );
- insertStmt.setString(2, queue.getNameShortString().toString());
- insertStmt.setString(3, routingKey == null ? null : routingKey.toString());
- if(args != null)
- {
- // TODO - In Java 6 we could use create/set Blob
- byte[] bytes = args.getDataAsBytes();
- ByteArrayInputStream bis = new ByteArrayInputStream(bytes);
- insertStmt.setBinaryStream(4, bis, bytes.length);
- }
- else
- {
- insertStmt.setNull(4, Types.BLOB);
- }
-
- insertStmt.executeUpdate();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " to database: " + e.getMessage(), e);
- }
-
+ ConfiguredObjectRecord configuredObject = _configuredObjectHelper.createBindingConfiguredObject(binding);
+ insertConfiguredObject(configuredObject);
}
-
-
}
@Override
- public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args)
+ public void unbindQueue(Binding binding)
throws AMQStoreException
{
- Connection conn = null;
- PreparedStatement stmt = null;
-
- try
- {
- conn = newAutoCommitConnection();
- // exchange_name varchar(255) not null, queue_name varchar(255) not null, binding_key varchar(255), arguments blob
- stmt = conn.prepareStatement(DELETE_FROM_BINDINGS);
- stmt.setString(1, exchange.getNameShortString().toString() );
- stmt.setString(2, queue.getNameShortString().toString());
- stmt.setString(3, routingKey == null ? null : routingKey.toString());
-
- int result = stmt.executeUpdate();
-
- if(result != 1)
- {
- throw new AMQStoreException("Queue binding for queue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error removing binding for AMQQueue with name " + queue.getNameShortString() + " to exchange "
- + exchange.getNameShortString() + " in database: " + e.getMessage(), e);
- }
- finally
+ int results = removeConfiguredObject(binding.getId());
+ if (results == 0)
{
- closePreparedStatement(stmt);
- closeConnection(conn);
+ throw new AMQStoreException("Binding " + binding + " not found");
}
}
@@ -1220,68 +847,8 @@ public class DerbyMessageStore implements MessageStore
if (_stateManager.isInState(State.ACTIVE))
{
- try
- {
- Connection conn = newAutoCommitConnection();
-
- PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
- try
- {
- stmt.setString(1, queue.getNameShortString().toString());
- ResultSet rs = stmt.executeQuery();
- try
- {
-
- // If we don't have any data in the result set then we can add this queue
- if (!rs.next())
- {
- PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_QUEUE);
-
- try
- {
- String owner = queue.getOwner() == null ? null : queue.getOwner().toString();
-
- insertStmt.setString(1, queue.getNameShortString().toString());
- insertStmt.setString(2, owner);
- insertStmt.setBoolean(3,queue.isExclusive());
-
- final byte[] underlying;
- if(arguments != null)
- {
- underlying = arguments.getDataAsBytes();
- }
- else
- {
- underlying = new byte[0];
- }
-
- ByteArrayInputStream bis = new ByteArrayInputStream(underlying);
- insertStmt.setBinaryStream(4,bis,underlying.length);
-
- insertStmt.execute();
- }
- finally
- {
- insertStmt.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- conn.close();
-
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error writing AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
- }
+ ConfiguredObjectRecord queueConfiguredObject = _configuredObjectHelper.createQueueConfiguredObject(queue, arguments);
+ insertConfiguredObject(queueConfiguredObject);
}
}
@@ -1299,54 +866,11 @@ public class DerbyMessageStore implements MessageStore
{
if (_stateManager.isInState(State.ACTIVE))
{
- try
+ ConfiguredObjectRecord queueConfiguredObject = loadConfiguredObject(queue.getId());
+ if (queueConfiguredObject != null)
{
- Connection conn = newAutoCommitConnection();
-
- try
- {
- PreparedStatement stmt = conn.prepareStatement(FIND_QUEUE);
- try
- {
- stmt.setString(1, queue.getNameShortString().toString());
-
- ResultSet rs = stmt.executeQuery();
- try
- {
- if (rs.next())
- {
- PreparedStatement stmt2 = conn.prepareStatement(UPDATE_QUEUE_EXCLUSIVITY);
- try
- {
- stmt2.setBoolean(1,queue.isExclusive());
- stmt2.setString(2, queue.getNameShortString().toString());
-
- stmt2.execute();
- }
- finally
- {
- stmt2.close();
- }
- }
- }
- finally
- {
- rs.close();
- }
- }
- finally
- {
- stmt.close();
- }
- }
- finally
- {
- conn.close();
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error updating AMQQueue with name " + queue.getNameShortString() + " to database: " + e.getMessage(), e);
+ ConfiguredObjectRecord newQueueRecord = _configuredObjectHelper.updateQueueConfiguredObject(queue, queueConfiguredObject);
+ updateConfiguredObject(newQueueRecord);
}
}
@@ -1410,31 +934,11 @@ public class DerbyMessageStore implements MessageStore
{
AMQShortString name = queue.getNameShortString();
_logger.debug("public void removeQueue(AMQShortString name = " + name + "): called");
- Connection conn = null;
- PreparedStatement stmt = null;
- try
+ int results = removeConfiguredObject(queue.getId());
+ if (results == 0)
{
- conn = newAutoCommitConnection();
- stmt = conn.prepareStatement(DELETE_FROM_QUEUE);
- stmt.setString(1, name.toString());
- int results = stmt.executeUpdate();
-
- if (results == 0)
- {
- throw new AMQStoreException("Queue " + name + " not found");
- }
- }
- catch (SQLException e)
- {
- throw new AMQStoreException("Error deleting AMQQueue with name " + name + " from database: " + e.getMessage(), e);
- }
- finally
- {
- closePreparedStatement(stmt);
- closeConnection(conn);
+ throw new AMQStoreException("Queue " + name + " with id " + queue.getId() + " not found");
}
-
-
}
@Override
@@ -1676,8 +1180,6 @@ public class DerbyMessageStore implements MessageStore
public void enqueueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- String name = queue.getResourceName();
-
Connection conn = connWrapper.getConnection();
@@ -1685,13 +1187,13 @@ public class DerbyMessageStore implements MessageStore
{
if (_logger.isDebugEnabled())
{
- _logger.debug("Enqueuing message " + messageId + " on queue " + name + "[Connection" + conn + "]");
+ _logger.debug("Enqueuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + queue.getId()+ "[Connection" + conn + "]");
}
PreparedStatement stmt = conn.prepareStatement(INSERT_INTO_QUEUE_ENTRY);
try
{
- stmt.setString(1,name);
+ stmt.setString(1, queue.getId().toString());
stmt.setLong(2,messageId);
stmt.executeUpdate();
}
@@ -1703,7 +1205,7 @@ public class DerbyMessageStore implements MessageStore
catch (SQLException e)
{
_logger.error("Failed to enqueue: " + e.getMessage(), e);
- throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + name
+ throw new AMQStoreException("Error writing enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" ) + " with id " + queue.getId()
+ " to database", e);
}
@@ -1711,8 +1213,6 @@ public class DerbyMessageStore implements MessageStore
public void dequeueMessage(ConnectionWrapper connWrapper, final TransactionLogResource queue, Long messageId) throws AMQStoreException
{
- String name = queue.getResourceName();
-
Connection conn = connWrapper.getConnection();
@@ -1722,7 +1222,7 @@ public class DerbyMessageStore implements MessageStore
PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_QUEUE_ENTRY);
try
{
- stmt.setString(1,name);
+ stmt.setString(1, queue.getId().toString());
stmt.setLong(2,messageId);
int results = stmt.executeUpdate();
@@ -1730,12 +1230,14 @@ public class DerbyMessageStore implements MessageStore
if(results != 1)
{
- throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + name);
+ throw new AMQStoreException("Unable to find message with id " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
}
if (_logger.isDebugEnabled())
{
- _logger.debug("Dequeuing message " + messageId + " on queue " + name );
+ _logger.debug("Dequeuing message " + messageId + " on queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId());
}
}
finally
@@ -1746,8 +1248,8 @@ public class DerbyMessageStore implements MessageStore
catch (SQLException e)
{
_logger.error("Failed to dequeue: " + e.getMessage(), e);
- throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + name
- + " from database", e);
+ throw new AMQStoreException("Error deleting enqueued message with id " + messageId + " for queue " + (queue instanceof AMQQueue ? ((AMQQueue)queue).getName() : "" )
+ + " with id " + queue.getId() + " from database", e);
}
}
@@ -1840,7 +1342,7 @@ public class DerbyMessageStore implements MessageStore
stmt.setString(4, "E");
for(Transaction.Record record : enqueues)
{
- stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setString(5, record.getQueue().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -1851,7 +1353,7 @@ public class DerbyMessageStore implements MessageStore
stmt.setString(4, "D");
for(Transaction.Record record : dequeues)
{
- stmt.setString(5, record.getQueue().getResourceName());
+ stmt.setString(5, record.getQueue().getId().toString());
stmt.setLong(6, record.getMessage().getMessageNumber());
stmt.executeUpdate();
}
@@ -2081,9 +1583,9 @@ public class DerbyMessageStore implements MessageStore
while(rs.next())
{
- String queueName = rs.getString(1);
+ String id = rs.getString(1);
long messageId = rs.getLong(2);
- queueEntryHandler.queueEntry(queueName,messageId);
+ queueEntryHandler.queueEntry(UUID.fromString(id), messageId);
}
}
finally
@@ -2137,13 +1639,13 @@ public class DerbyMessageStore implements MessageStore
private static class RecordImpl implements Transaction.Record, TransactionLogResource, EnqueableMessage
{
- private final String _queueName;
private long _messageNumber;
+ private UUID _queueId;
- public RecordImpl(String queueName, long messageNumber)
+ public RecordImpl(UUID queueId, long messageNumber)
{
- _queueName = queueName;
_messageNumber = messageNumber;
+ _queueId = queueId;
}
@Override
@@ -2177,9 +1679,9 @@ public class DerbyMessageStore implements MessageStore
}
@Override
- public String getResourceName()
+ public UUID getId()
{
- return _queueName;
+ return _queueId;
}
}
@@ -2237,10 +1739,10 @@ public class DerbyMessageStore implements MessageStore
{
String actionType = rs.getString(1);
- String queueName = rs.getString(2);
+ UUID queueId = UUID.fromString(rs.getString(2));
long messageId = rs.getLong(3);
- RecordImpl record = new RecordImpl(queueName, messageId);
+ RecordImpl record = new RecordImpl(queueId, messageId);
List<RecordImpl> records = "E".equals(actionType) ? enqueues : dequeues;
records.add(record);
}
@@ -2319,11 +1821,11 @@ public class DerbyMessageStore implements MessageStore
}
- private void addContent(Connection conn, long messageId, int offset, ByteBuffer src)
+ private void addContent(Connection conn, long messageId, ByteBuffer src)
{
if(_logger.isDebugEnabled())
{
- _logger.debug("Adding content chunk offset " + offset + " for message " +messageId);
+ _logger.debug("Adding content for message " +messageId);
}
PreparedStatement stmt = null;
@@ -2336,20 +1838,15 @@ public class DerbyMessageStore implements MessageStore
stmt = conn.prepareStatement(INSERT_INTO_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
- stmt.setInt(2, offset);
- stmt.setInt(3, offset+chunkData.length);
-
-
- // TODO in Java 6 we could just use blobs
ByteArrayInputStream bis = new ByteArrayInputStream(chunkData);
- stmt.setBinaryStream(4, bis, chunkData.length);
+ stmt.setBinaryStream(2, bis, chunkData.length);
stmt.executeUpdate();
}
catch (SQLException e)
{
closeConnection(conn);
- throw new RuntimeException("Error adding content chunk offset " + offset + " for message " + messageId + ": " + e.getMessage(), e);
+ throw new RuntimeException("Error adding content for message " + messageId + ": " + e.getMessage(), e);
}
finally
{
@@ -2370,33 +1867,32 @@ public class DerbyMessageStore implements MessageStore
stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
stmt.setLong(1,messageId);
- stmt.setInt(2, offset);
- stmt.setInt(3, offset+dst.remaining());
ResultSet rs = stmt.executeQuery();
int written = 0;
- while(rs.next())
+ if (rs.next())
{
- int offsetInMessage = rs.getInt(1);
- Blob dataAsBlob = rs.getBlob(2);
+
+ Blob dataAsBlob = rs.getBlob(1);
final int size = (int) dataAsBlob.length();
byte[] dataAsBytes = dataAsBlob.getBytes(1, size);
- int posInArray = offset + written - offsetInMessage;
- int count = size - posInArray;
- if(count > dst.remaining())
+ if (offset > size)
{
- count = dst.remaining();
+ throw new RuntimeException("Offset " + offset + " is greater than message size " + size
+ + " for message id " + messageId + "!");
+
}
- dst.put(dataAsBytes,posInArray,count);
- written+=count;
- if(dst.remaining() == 0)
+ written = size - offset;
+ if(written > dst.remaining())
{
- break;
+ written = dst.remaining();
}
+
+ dst.put(dataAsBytes, offset, written);
}
return written;
@@ -2635,7 +2131,7 @@ public class DerbyMessageStore implements MessageStore
try
{
storeMetaData(conn, _messageId, _metaData);
- DerbyMessageStore.this.addContent(conn, _messageId, 0,
+ DerbyMessageStore.this.addContent(conn, _messageId,
_data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
}
finally
@@ -2699,4 +2195,255 @@ public class DerbyMessageStore implements MessageStore
{
return _storeLocation;
}
+
+ private void insertConfiguredObject(ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ // If we don't have any data in the result set then we can add this configured object
+ if (!rs.next())
+ {
+ PreparedStatement insertStmt = conn.prepareStatement(INSERT_INTO_CONFIGURED_OBJECTS);
+ try
+ {
+ insertStmt.setString(1, configuredObject.getId().toString());
+ insertStmt.setString(2, configuredObject.getType());
+ if(configuredObject.getAttributes() == null)
+ {
+ insertStmt.setNull(3, Types.BLOB);
+ }
+ else
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ insertStmt.setBinaryStream(3, bis, attributesAsBytes.length);
+ }
+ insertStmt.execute();
+ }
+ finally
+ {
+ insertStmt.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error inserting of configured object " + configuredObject + " into database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private int removeConfiguredObject(UUID id) throws AMQStoreException
+ {
+ int results = 0;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(DELETE_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt.setString(1, id.toString());
+ results = stmt.executeUpdate();
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error deleting of configured object with id " + id + " from database: " + e.getMessage(), e);
+ }
+ return results;
+ }
+
+ private void updateConfiguredObject(final ConfiguredObjectRecord configuredObject) throws AMQStoreException
+ {
+ if (_stateManager.isInState(State.ACTIVE))
+ {
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, configuredObject.getId().toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ PreparedStatement stmt2 = conn.prepareStatement(UPDATE_CONFIGURED_OBJECTS);
+ try
+ {
+ stmt2.setString(1, configuredObject.getType());
+ if (configuredObject.getAttributes() != null)
+ {
+ byte[] attributesAsBytes = configuredObject.getAttributes().getBytes(UTF8_CHARSET);
+ ByteArrayInputStream bis = new ByteArrayInputStream(attributesAsBytes);
+ stmt2.setBinaryStream(2, bis, attributesAsBytes.length);
+ }
+ else
+ {
+ stmt2.setNull(2, Types.BLOB);
+ }
+ stmt2.setString(3, configuredObject.getId().toString());
+ stmt2.execute();
+ }
+ finally
+ {
+ stmt2.close();
+ }
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error updating configured object " + configuredObject + " in database: " + e.getMessage(), e);
+ }
+ }
+ }
+
+ private ConfiguredObjectRecord loadConfiguredObject(final UUID id) throws AMQStoreException
+ {
+ ConfiguredObjectRecord result = null;
+ try
+ {
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(FIND_CONFIGURED_OBJECT);
+ try
+ {
+ stmt.setString(1, id.toString());
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ if (rs.next())
+ {
+ String type = rs.getString(1);
+ Blob blob = rs.getBlob(2);
+ String attributes = null;
+ if (blob != null)
+ {
+ attributes = blobToString(blob);
+ }
+ result = new ConfiguredObjectRecord(id, type, attributes);
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ }
+ catch (SQLException e)
+ {
+ throw new AMQStoreException("Error loading of configured object with id " + id + " from database: "
+ + e.getMessage(), e);
+ }
+ return result;
+ }
+
+ private String blobToString(Blob blob) throws SQLException
+ {
+ byte[] bytes = blob.getBytes(1, (int)blob.length());
+ return new String(bytes, UTF8_CHARSET);
+ }
+
+ private List<ConfiguredObjectRecord> loadConfiguredObjects() throws SQLException
+ {
+ ArrayList<ConfiguredObjectRecord> results = new ArrayList<ConfiguredObjectRecord>();
+ Connection conn = newAutoCommitConnection();
+ try
+ {
+ PreparedStatement stmt = conn.prepareStatement(SELECT_FROM_CONFIGURED_OBJECTS);
+ try
+ {
+ ResultSet rs = stmt.executeQuery();
+ try
+ {
+ while (rs.next())
+ {
+ String id = rs.getString(1);
+ String objectType = rs.getString(2);
+ String attributes = blobToString(rs.getBlob(3));
+ results.add(new ConfiguredObjectRecord(UUID.fromString(id), objectType, attributes));
+ }
+ }
+ finally
+ {
+ rs.close();
+ }
+ }
+ finally
+ {
+ stmt.close();
+ }
+ }
+ finally
+ {
+ conn.close();
+ }
+ return results;
+ }
} \ No newline at end of file
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 54b91a0ce2..79a8bc0e4c 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.flow.WindowCreditManager;
import org.apache.qpid.server.logging.messages.ExchangeMessages;
import org.apache.qpid.server.message.MessageMetaData_0_10;
import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.BaseQueue;
@@ -683,13 +684,12 @@ public class ServerSessionDelegate extends SessionDelegate
{
String exchangeName = method.getExchange();
VirtualHost virtualHost = getVirtualHost(session);
- Exchange exchange = getExchange(session, exchangeName);
+ ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
//we must check for any unsupported arguments present and throw not-implemented
if(method.hasArguments())
{
Map<String,Object> args = method.getArguments();
-
//QPID-3392: currently we don't support any!
if(!args.isEmpty())
{
@@ -697,120 +697,113 @@ public class ServerSessionDelegate extends SessionDelegate
return;
}
}
-
- if(method.getPassive())
+ synchronized(exchangeRegistry)
{
- if(exchange == null)
- {
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '"+exchangeName+"'");
-
- }
- else
- {
- if(!exchange.getTypeShortString().toString().equals(method.getType()) && (method.getType() != null && method.getType().length() > 0))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: " + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() +".");
- }
- }
+ Exchange exchange = getExchange(session, exchangeName);
- }
- else
- {
- if (exchange == null)
+ if(method.getPassive())
{
- if(exchangeName.startsWith("amq."))
+ if(exchange == null)
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'amq.'.");
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "not-found: exchange-name '" + exchangeName + "'");
}
- else if(exchangeName.startsWith("qpid."))
+ else
{
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to declare exchange: " + exchangeName +
- " which begins with reserved prefix 'qpid.'.");
+ if (!exchange.getTypeShortString().toString().equals(method.getType())
+ && (method.getType() != null && method.getType().length() > 0))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to redeclare exchange: "
+ + exchangeName + " of type " + exchange.getTypeShortString() + " to " + method.getType() + ".");
+ }
}
- else
+ }
+ else
+ {
+ if (exchange == null)
{
- ExchangeRegistry exchangeRegistry = getExchangeRegistry(session);
- ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
-
-
-
- try
+ if (exchangeName.startsWith("amq."))
{
-
- exchange = exchangeFactory.createExchange(method.getExchange(),
- method.getType(),
- method.getDurable(),
- method.getAutoDelete());
-
- String alternateExchangeName = method.getAlternateExchange();
- boolean validAlternate;
- if(alternateExchangeName != null && alternateExchangeName.length() != 0)
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved prefix 'amq.'.");
+ }
+ else if (exchangeName.startsWith("qpid."))
+ {
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED, "Attempt to declare exchange: "
+ + exchangeName + " which begins with reserved prefix 'qpid.'.");
+ }
+ else
+ {
+ ExchangeFactory exchangeFactory = virtualHost.getExchangeFactory();
+ try
{
- Exchange alternate = getExchange(session, alternateExchangeName);
- if(alternate == null)
+ exchange = exchangeFactory.createExchange(method.getExchange(),
+ method.getType(),
+ method.getDurable(),
+ method.getAutoDelete());
+ String alternateExchangeName = method.getAlternateExchange();
+ boolean validAlternate;
+ if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
- validAlternate = false;
+ Exchange alternate = getExchange(session, alternateExchangeName);
+ if(alternate == null)
+ {
+ validAlternate = false;
+ }
+ else
+ {
+ exchange.setAlternateExchange(alternate);
+ validAlternate = true;
+ }
}
else
{
- exchange.setAlternateExchange(alternate);
validAlternate = true;
}
- }
- else
- {
- validAlternate = true;
- }
-
- if(validAlternate)
- {
- if (exchange.isDurable())
+ if(validAlternate)
{
- DurableConfigurationStore store = virtualHost.getMessageStore();
- store.createExchange(exchange);
+ if (exchange.isDurable())
+ {
+ DurableConfigurationStore store = virtualHost.getMessageStore();
+ store.createExchange(exchange);
+ }
+ exchangeRegistry.registerExchange(exchange);
}
-
- exchangeRegistry.registerExchange(exchange);
+ else
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + alternateExchangeName);
+ }
+ }
+ catch(AMQUnknownExchangeType e)
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
}
- else
+ catch (AMQException e)
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND,
- "Unknown alternate exchange " + alternateExchangeName);
+ exception(session, method, e, "Cannot declare exchange '" + exchangeName);
}
}
- catch(AMQUnknownExchangeType e)
+ }
+ else
+ {
+ if(!exchange.getTypeShortString().toString().equals(method.getType()))
{
- exception(session, method, ExecutionErrorCode.NOT_FOUND, "Unknown Exchange Type: " + method.getType());
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to redeclare exchange: " + exchangeName
+ + " of type " + exchange.getTypeShortString()
+ + " to " + method.getType() +".");
}
- catch (AMQException e)
+ else if(method.hasAlternateExchange()
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
{
- exception(session, method, e, "Cannot declare exchange '" + exchangeName);
+ exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
+ "Attempt to change alternate exchange of: " + exchangeName
+ + " from " + exchange.getAlternateExchange()
+ + " to " + method.getAlternateExchange() +".");
}
}
}
- else
- {
- if(!exchange.getTypeShortString().toString().equals(method.getType()))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to redeclare exchange: " + exchangeName
- + " of type " + exchange.getTypeShortString()
- + " to " + method.getType() +".");
- }
- else if(method.hasAlternateExchange()
- && (exchange.getAlternateExchange() == null ||
- !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
- {
- exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
- "Attempt to change alternate exchange of: " + exchangeName
- + " from " + exchange.getAlternateExchange()
- + " to " + method.getAlternateExchange() +".");
- }
- }
-
}
}
@@ -1396,8 +1389,8 @@ public class ServerSessionDelegate extends SessionDelegate
{
String owner = body.getExclusive() ? session.getClientID() : null;
- final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(queueName, body.getDurable(), owner, body.getAutoDelete(),
- body.getExclusive(), virtualHost, body.getArguments());
+ final AMQQueue queue = AMQQueueFactory.createAMQQueueImpl(UUIDGenerator.generateUUID(), queueName, body.getDurable(), owner,
+ body.getAutoDelete(), body.getExclusive(), virtualHost, body.getArguments());
return queue;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java
new file mode 100644
index 0000000000..2d9ba38555
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/util/MapJsonSerializer.java
@@ -0,0 +1,69 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.io.StringWriter;
+import java.util.Map;
+
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.type.TypeReference;
+
+public class MapJsonSerializer
+{
+ private static final TypeReference<Map<String, Object>> MAP_TYPE_REFERENCE = new TypeReference<Map<String, Object>>()
+ {
+ };
+
+ private ObjectMapper _mapper;
+
+ public MapJsonSerializer()
+ {
+ _mapper = new ObjectMapper();
+ }
+
+ public String serialize(Map<String, Object> attributeMap)
+ {
+ StringWriter stringWriter = new StringWriter();
+ try
+ {
+ _mapper.writeValue(stringWriter, attributeMap);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failure to serialize map:" + attributeMap, e);
+ }
+ return stringWriter.toString();
+ }
+
+ public Map<String, Object> deserialize(String json)
+ {
+ Map<String, Object> attributesMap = null;
+ try
+ {
+ attributesMap = _mapper.readValue(json, MAP_TYPE_REFERENCE);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException("Failure to deserialize json:" + json, e);
+ }
+ return attributesMap;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 9333456c2e..e956806823 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -100,7 +100,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return this;
}
- public void queue(String queueName, String owner, boolean exclusive, FieldTable arguments)
+ public void queue(UUID id, String queueName, String owner, boolean exclusive, FieldTable arguments)
{
try
{
@@ -108,7 +108,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if (q == null)
{
- q = AMQQueueFactory.createAMQQueueImpl(queueName, true, owner, false, exclusive, _virtualHost,
+ q = AMQQueueFactory.createAMQQueueImpl(id, queueName, true, owner, false, exclusive, _virtualHost,
FieldTable.convertToMap(arguments));
_virtualHost.getQueueRegistry().registerQueue(q);
}
@@ -130,7 +130,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
return this;
}
- public void exchange(String exchangeName, String type, boolean autoDelete)
+ public void exchange(UUID id, String exchangeName, String type, boolean autoDelete)
{
try
{
@@ -139,7 +139,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeNameSS);
if (exchange == null)
{
- exchange = _virtualHost.getExchangeFactory().createExchange(exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
+ exchange = _virtualHost.getExchangeFactory().createExchange(id, exchangeNameSS, new AMQShortString(type), true, autoDelete, 0);
_virtualHost.getExchangeRegistry().registerExchange(exchange);
}
}
@@ -212,7 +212,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
for(Transaction.Record record : enqueues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -265,13 +265,13 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getResourceName()));
+ record.getQueue().getId().toString()));
}
}
for(Transaction.Record record : dequeues)
{
- final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getResourceName());
+ final AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(record.getQueue().getId());
if(queue != null)
{
final long messageId = record.getMessage().getMessageNumber();
@@ -315,7 +315,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
StringBuilder xidString = xidAsString(id);
CurrentActor.get().message(_logSubject,
TransactionLogMessages.XA_INCOMPLETE_QUEUE(xidString.toString(),
- record.getQueue().getResourceName()));
+ record.getQueue().getId().toString()));
}
}
@@ -354,21 +354,22 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
CurrentActor.get().message(_logSubject, TransactionLogMessages.RECOVERY_COMPLETE(null, false));
}
- public void binding(String exchangeName, String queueName, String bindingKey, ByteBuffer buf)
+ @Override
+ public void binding(UUID bindingId, UUID exchangeId, UUID queueId, String bindingKey, ByteBuffer buf)
{
try
{
- Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeName);
+ Exchange exchange = _virtualHost.getExchangeRegistry().getExchange(exchangeId);
if (exchange == null)
{
- _logger.error("Unknown exchange: " + exchangeName + ", cannot bind queue : " + queueName);
+ _logger.error("Unknown exchange id " + exchangeId + ", cannot bind queue with id " + queueId);
return;
}
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(new AMQShortString(queueName));
+
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
if (queue == null)
{
- _logger.error("Unknown queue: " + queueName + ", cannot be bound to exchange: " + exchangeName);
+ _logger.error("Unknown queue id " + queueId + ", cannot be bound to exchange: " + exchange.getName());
}
else
{
@@ -392,10 +393,10 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if(bf.getBinding(bindingKey, queue, exchange, argumentMap) == null)
{
- _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queueName
+ _logger.info("Restoring binding: (Exchange: " + exchange.getNameShortString() + ", Queue: " + queue.getName()
+ ", Routing Key: " + bindingKey + ", Arguments: " + argumentsFT + ")");
- bf.restoreBinding(bindingKey, queue, exchange, argumentMap);
+ bf.restoreBinding(bindingId, bindingKey, queue, exchange, argumentMap);
}
}
}
@@ -417,16 +418,14 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
- public void queueEntry(final String queueName, long messageId)
+ public void queueEntry(final UUID queueId, long messageId)
{
- AMQShortString queueNameShortString = new AMQShortString(queueName);
-
- AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueNameShortString);
-
+ AMQQueue queue = _virtualHost.getQueueRegistry().getQueue(queueId);
try
{
if(queue != null)
{
+ String queueName = queue.getName();
ServerMessage message = _recoveredMessages.get(messageId);
_unusedMessages.remove(messageId);
@@ -436,7 +435,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
if (_logger.isDebugEnabled())
{
- _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queue.getNameShortString());
+ _logger.debug("On recovery, delivering " + message.getMessageNumber() + " to " + queueName);
}
Integer count = _queueRecoveries.get(queueName);
@@ -451,7 +450,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
else
{
- _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queue.getNameShortString() + " is unknown, entry will be discarded");
+ _logger.warn("Message id " + messageId + " referenced in log as enqueued in queue " + queueName + " is unknown, entry will be discarded");
Transaction txn = _store.newTransaction();
txn.dequeueMessage(queue, new DummyMessage(messageId));
txn.commitTranAsync();
@@ -459,15 +458,15 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
}
else
{
- _logger.warn("Message id " + messageId + " in log references queue " + queueName + " which is not in the configuration, entry will be discarded");
+ _logger.warn("Message id " + messageId + " in log references queue with id " + queueId + " which is not in the configuration, entry will be discarded");
Transaction txn = _store.newTransaction();
TransactionLogResource mockQueue =
new TransactionLogResource()
{
-
- public String getResourceName()
+ @Override
+ public UUID getId()
{
- return queueName;
+ return queueId;
}
};
txn.dequeueMessage(mockQueue, new DummyMessage(messageId));
@@ -479,9 +478,6 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
{
throw new RuntimeException(e);
}
-
-
-
}
public DtxRecordRecoveryHandler completeQueueEntryRecovery()
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index b6ee95a1cb..afd8fd9ed2 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
@@ -45,6 +46,7 @@ import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -274,7 +276,7 @@ public class AbstractHeadersExchangeTestBase extends InternalBrokerBaseCase
public TestQueue(AMQShortString name) throws AMQException
{
- super(name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP);
+ super(UUIDGenerator.generateUUID(), name, false, new AMQShortString("test"), true, false,ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"), Collections.EMPTY_MAP);
ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test").getQueueRegistry().registerQueue(this);
}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
index 1fac4afe29..9034bf9c3a 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/ExchangeMBeanTest.java
@@ -26,6 +26,7 @@ import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.management.common.mbeans.ManagedExchange;
import org.apache.qpid.server.management.ManagedObject;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -52,7 +53,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testGeneralProperties() throws Exception
{
DirectExchange exchange = new DirectExchange();
- exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -67,7 +68,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testDirectExchangeMBean() throws Exception
{
DirectExchange exchange = new DirectExchange();
- exchange.initialise(_virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost, ExchangeDefaults.DIRECT_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -82,7 +83,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testTopicExchangeMBean() throws Exception
{
TopicExchange exchange = new TopicExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.TOPIC_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -97,7 +98,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testHeadersExchangeMBean() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -119,7 +120,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testHeadersExchangeMBeanMatchPropertyNoValue() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
@@ -137,7 +138,7 @@ public class ExchangeMBeanTest extends InternalBrokerBaseCase
public void testInvalidHeaderBindingMalformed() throws Exception
{
HeadersExchange exchange = new HeadersExchange();
- exchange.initialise(_virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
+ exchange.initialise(UUIDGenerator.generateUUID(), _virtualHost,ExchangeDefaults.HEADERS_EXCHANGE_NAME, false, 0, true);
ManagedObject managedObj = exchange.getManagedObject();
ManagedExchange mbean = (ManagedExchange)managedObj;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 2e3ff90df9..e123a968a4 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.queue;
+import java.util.UUID;
+
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
index 8041d59ffa..52ad4a7c5b 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/SimpleAMQQueueTest.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.BaseQueue.PostEnqueueAction;
import org.apache.qpid.server.queue.SimpleAMQQueue.QueueEntryFilter;
import org.apache.qpid.server.registry.ApplicationRegistry;
@@ -137,7 +138,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
}
try {
- _queue = new SimpleAMQQueue(_qname, false, _owner, false, false,null, Collections.EMPTY_MAP);
+ _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, _owner, false,false, null, Collections.EMPTY_MAP);
assertNull("Queue was created", _queue);
}
catch (IllegalArgumentException e)
@@ -479,7 +480,7 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testAutoDeleteQueue() throws Exception
{
_queue.stop();
- _queue = new SimpleAMQQueue(_qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
+ _queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), _qname, false, null, true, false, _virtualHost, Collections.EMPTY_MAP);
_queue.setDeleteOnNoConsumers(true);
_queue.registerSubscription(_subscription, false);
AMQMessage message = createMessage(new Long(25));
@@ -691,8 +692,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testProcessQueueWithUniqueSelectors() throws Exception
{
TestSimpleQueueEntryListFactory factory = new TestSimpleQueueEntryListFactory();
- SimpleAMQQueue testQueue = new SimpleAMQQueue("testQueue", false, "testOwner",false,
- false, _virtualHost, factory, null)
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), "testQueue", false,"testOwner",
+ false, false, _virtualHost, factory, null)
{
@Override
public void deliverAsync(Subscription sub)
@@ -1028,8 +1029,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
int dequeueMessageIndex = 1;
// create queue with overridden method deliverAsync
- SimpleAMQQueue testQueue = new SimpleAMQQueue(new AMQShortString("test"), false,
- new AMQShortString("testOwner"), false, false, _virtualHost, null)
+ SimpleAMQQueue testQueue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"),
+ false, new AMQShortString("testOwner"), false, false, _virtualHost, null)
{
@Override
public void deliverAsync(Subscription sub)
@@ -1099,8 +1100,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testEqueueDequeuedEntry()
{
// create a queue where each even entry is considered a dequeued
- SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("test"), false, new AMQShortString("testOwner"),
- false, false, _virtualHost, new QueueEntryListFactory()
+ SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("test"), false,
+ new AMQShortString("testOwner"), false, false, _virtualHost, new QueueEntryListFactory()
{
public QueueEntryList createQueueEntryList(AMQQueue queue)
{
@@ -1177,8 +1178,8 @@ public class SimpleAMQQueueTest extends InternalBrokerBaseCase
public void testActiveConsumerCount() throws Exception
{
- final SimpleAMQQueue queue = new SimpleAMQQueue(new AMQShortString("testActiveConsumerCount"), false, new AMQShortString("testOwner"),
- false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
+ final SimpleAMQQueue queue = new SimpleAMQQueue(UUIDGenerator.generateUUID(), new AMQShortString("testActiveConsumerCount"), false,
+ new AMQShortString("testOwner"), false, false, _virtualHost, new SimpleQueueEntryList.Factory(), null);
//verify adding an active subscription increases the count
final MockSubscription subscription1 = new MockSubscription();
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
new file mode 100644
index 0000000000..a1cbb2cbc8
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/DurableConfigurationStoreTest.java
@@ -0,0 +1,377 @@
+package org.apache.qpid.server.store;
+
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Matchers.isA;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.when;
+import static org.mockito.Mockito.times;
+
+import java.io.File;
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+
+import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.AMQStoreException;
+import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.binding.Binding;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.logging.SystemOutMessageLogger;
+import org.apache.qpid.server.logging.actors.CurrentActor;
+import org.apache.qpid.server.logging.actors.TestLogActor;
+import org.apache.qpid.server.logging.subjects.TestBlankSubject;
+import org.apache.qpid.server.message.EnqueableMessage;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MockStoredMessage;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler;
+import org.apache.qpid.server.store.TransactionLogRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.BindingRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.ExchangeRecoveryHandler;
+import org.apache.qpid.server.store.ConfigurationRecoveryHandler.QueueRecoveryHandler;
+import org.apache.qpid.server.store.MessageStoreRecoveryHandler.StoredMessageRecoveryHandler;
+import org.apache.qpid.server.store.Transaction.Record;
+import org.apache.qpid.server.store.derby.DerbyMessageStoreFactory;
+import org.apache.qpid.test.utils.QpidTestCase;
+import org.apache.qpid.util.FileUtils;
+
+public class DurableConfigurationStoreTest extends QpidTestCase
+{
+ private static final String EXCHANGE_NAME = "exchangeName";
+ private String _storePath;
+ private String _storeName;
+ private MessageStore _store;
+ private Configuration _configuration;
+
+ private ConfigurationRecoveryHandler _recoveryHandler;
+ private QueueRecoveryHandler _queueRecoveryHandler;
+ private ExchangeRecoveryHandler _exchangeRecoveryHandler;
+ private BindingRecoveryHandler _bindingRecoveryHandler;
+ private ConfigurationRecoveryHandler.BrokerLinkRecoveryHandler _linkRecoveryHandler;
+ private MessageStoreRecoveryHandler _messageStoreRecoveryHandler;
+ private StoredMessageRecoveryHandler _storedMessageRecoveryHandler;
+ private TransactionLogRecoveryHandler _logRecoveryHandler;
+ private TransactionLogRecoveryHandler.QueueEntryRecoveryHandler _queueEntryRecoveryHandler;
+ private TransactionLogRecoveryHandler.DtxRecordRecoveryHandler _dtxRecordRecoveryHandler;
+
+ private Exchange _exchange = mock(Exchange.class);
+ private static final String ROUTING_KEY = "routingKey";
+ private static final String QUEUE_NAME = "queueName";
+ private FieldTable _bindingArgs;
+ private UUID _queueId;
+ private UUID _exchangeId;
+
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ _queueId = UUIDGenerator.generateUUID();
+ _exchangeId = UUIDGenerator.generateUUID();
+
+ _storeName = getName();
+ _storePath = TMP_FOLDER + "/" + _storeName;
+ FileUtils.delete(new File(_storePath), true);
+ setTestSystemProperty("QPID_WORK", TMP_FOLDER);
+ _configuration = mock(Configuration.class);
+ _recoveryHandler = mock(ConfigurationRecoveryHandler.class);
+ _queueRecoveryHandler = mock(QueueRecoveryHandler.class);
+ _exchangeRecoveryHandler = mock(ExchangeRecoveryHandler.class);
+ _bindingRecoveryHandler = mock(BindingRecoveryHandler.class);
+ _storedMessageRecoveryHandler = mock(StoredMessageRecoveryHandler.class);
+ _logRecoveryHandler = mock(TransactionLogRecoveryHandler.class);
+ _messageStoreRecoveryHandler = mock(MessageStoreRecoveryHandler.class);
+ _queueEntryRecoveryHandler = mock(TransactionLogRecoveryHandler.QueueEntryRecoveryHandler.class);
+ _dtxRecordRecoveryHandler = mock(TransactionLogRecoveryHandler.DtxRecordRecoveryHandler.class);
+
+ when(_messageStoreRecoveryHandler.begin()).thenReturn(_storedMessageRecoveryHandler);
+ when(_recoveryHandler.begin(isA(MessageStore.class))).thenReturn(_queueRecoveryHandler);
+ when(_queueRecoveryHandler.completeQueueRecovery()).thenReturn(_exchangeRecoveryHandler);
+ when(_exchangeRecoveryHandler.completeExchangeRecovery()).thenReturn(_bindingRecoveryHandler);
+ when(_bindingRecoveryHandler.completeBindingRecovery()).thenReturn(_linkRecoveryHandler);
+ when(_logRecoveryHandler.begin(any(MessageStore.class))).thenReturn(_queueEntryRecoveryHandler);
+ when(_queueEntryRecoveryHandler.completeQueueEntryRecovery()).thenReturn(_dtxRecordRecoveryHandler);
+ when(_exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(EXCHANGE_NAME));
+ when(_exchange.getId()).thenReturn(_exchangeId);
+ when(_configuration.getString(eq(MessageStoreConstants.ENVIRONMENT_PATH_PROPERTY), anyString())).thenReturn(
+ _storePath);
+
+ _bindingArgs = new FieldTable();
+ AMQShortString argKey = AMQPFilterTypes.JMS_SELECTOR.getValue();
+ String argValue = "some selector expression";
+ _bindingArgs.put(argKey, argValue);
+
+ reopenStore();
+ }
+
+ public void tearDown() throws Exception
+ {
+ FileUtils.delete(new File(_storePath), true);
+ super.tearDown();
+ }
+
+ public void testCreateExchange() throws Exception
+ {
+ Exchange exchange = createTestExchange();
+ _store.createExchange(exchange);
+
+ reopenStore();
+ verify(_exchangeRecoveryHandler).exchange(_exchangeId, getName(), getName() + "Type", true);
+ }
+
+ public void testRemoveExchange() throws Exception
+ {
+ Exchange exchange = createTestExchange();
+ _store.createExchange(exchange);
+
+ _store.removeExchange(exchange);
+
+ reopenStore();
+ verify(_exchangeRecoveryHandler, never()).exchange(any(UUID.class), anyString(), anyString(), anyBoolean());
+ }
+
+ public void testBindQueue() throws Exception
+ {
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange,
+ FieldTable.convertToMap(_bindingArgs));
+ _store.bindQueue(binding);
+
+ reopenStore();
+
+ ByteBuffer argsAsBytes = ByteBuffer.wrap(_bindingArgs.getDataAsBytes());
+
+ verify(_bindingRecoveryHandler).binding(binding.getId(), _exchange.getId(), queue.getId(), ROUTING_KEY, argsAsBytes);
+ }
+
+ public void testUnbindQueue() throws Exception
+ {
+ AMQQueue queue = createTestQueue(QUEUE_NAME, "queueOwner", false);
+ Binding binding = new Binding(UUIDGenerator.generateUUID(), ROUTING_KEY, queue, _exchange,
+ FieldTable.convertToMap(_bindingArgs));
+ _store.bindQueue(binding);
+
+ _store.unbindQueue(binding);
+ reopenStore();
+
+ verify(_bindingRecoveryHandler, never()).binding(any(UUID.class), any(UUID.class), any(UUID.class), anyString(),
+ isA(ByteBuffer.class));
+ }
+
+ public void testCreateQueueAMQQueue() throws Exception
+ {
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
+ _store.createQueue(queue);
+
+ reopenStore();
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, null);
+ }
+
+ public void testCreateQueueAMQQueueFieldTable() throws Exception
+ {
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
+ attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+
+ FieldTable arguments = FieldTable.convertToFieldTable(attributes);
+ _store.createQueue(queue, arguments);
+
+ reopenStore();
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", true, arguments);
+ }
+
+ public void testUpdateQueue() throws Exception
+ {
+ // create queue
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
+ attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ FieldTable arguments = FieldTable.convertToFieldTable(attributes);
+ _store.createQueue(queue, arguments);
+
+ // update the queue to have exclusive=false
+ queue = createTestQueue(getName(), getName() + "Owner", false);
+ _store.updateQueue(queue);
+
+ reopenStore();
+ verify(_queueRecoveryHandler).queue(_queueId, getName(), getName() + "Owner", false, arguments);
+ }
+
+ public void testRemoveQueue() throws Exception
+ {
+ // create queue
+ AMQQueue queue = createTestQueue(getName(), getName() + "Owner", true);
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put("x-qpid-dlq-enabled", Boolean.TRUE);
+ attributes.put("x-qpid-maximum-delivery-count", new Integer(10));
+ FieldTable arguments = FieldTable.convertToFieldTable(attributes);
+ _store.createQueue(queue, arguments);
+
+ // remove queue
+ _store.removeQueue(queue);
+ reopenStore();
+ verify(_queueRecoveryHandler, never()).queue(any(UUID.class), anyString(), anyString(), anyBoolean(),
+ any(FieldTable.class));
+ }
+
+ private AMQQueue createTestQueue(String queueName, String queueOwner, boolean exclusive) throws AMQStoreException
+ {
+ AMQQueue queue = mock(AMQQueue.class);
+ when(queue.getName()).thenReturn(queueName);
+ when(queue.getNameShortString()).thenReturn(AMQShortString.valueOf(queueName));
+ when(queue.getOwner()).thenReturn(AMQShortString.valueOf(queueOwner));
+ when(queue.isExclusive()).thenReturn(exclusive);
+ when(queue.getId()).thenReturn(_queueId);
+ return queue;
+ }
+
+ private Exchange createTestExchange()
+ {
+ Exchange exchange = mock(Exchange.class);
+ when(exchange.getNameShortString()).thenReturn(AMQShortString.valueOf(getName()));
+ when(exchange.getName()).thenReturn(getName());
+ when(exchange.getTypeShortString()).thenReturn(AMQShortString.valueOf(getName() + "Type"));
+ when(exchange.isAutoDelete()).thenReturn(true);
+ when(exchange.getId()).thenReturn(_exchangeId);
+ return exchange;
+ }
+
+ private void reopenStore() throws Exception
+ {
+ if (_store != null)
+ {
+ _store.close();
+ }
+ _store = createStore();
+
+ _store.configureConfigStore(_storeName, _recoveryHandler, _configuration);
+ _store.configureMessageStore(_storeName, _messageStoreRecoveryHandler, _logRecoveryHandler, _configuration);
+ _store.activate();
+ }
+
+ protected MessageStore createStore() throws Exception
+ {
+ String storeFactoryClass = System.getProperty(MS_FACTORY_CLASS_NAME_KEY);
+ if (storeFactoryClass == null)
+ {
+ storeFactoryClass = DerbyMessageStoreFactory.class.getName();
+ }
+ CurrentActor.set(new TestLogActor(new SystemOutMessageLogger()));
+ MessageStoreFactory factory = (MessageStoreFactory) Class.forName(storeFactoryClass).newInstance();
+ return factory.createMessageStore();
+ }
+
+ public void testRecordXid() throws Exception
+ {
+ Record enqueueRecord = getTestRecord(1);
+ Record dequeueRecord = getTestRecord(2);
+ Record[] enqueues = { enqueueRecord };
+ Record[] dequeues = { dequeueRecord };
+ byte[] globalId = new byte[] { 1 };
+ byte[] branchId = new byte[] { 2 };
+
+ Transaction transaction = _store.newTransaction();
+ transaction.recordXid(1l, globalId, branchId, enqueues, dequeues);
+ transaction.commitTran();
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+
+ transaction = _store.newTransaction();
+ transaction.removeXid(1l, globalId, branchId);
+ transaction.commitTran();
+
+ reopenStore();
+ verify(_dtxRecordRecoveryHandler, times(1)).dtxRecord(1l, globalId, branchId, enqueues, dequeues);
+ }
+
+ private Record getTestRecord(long messageNumber)
+ {
+ UUID queueId1 = UUIDGenerator.generateUUID();
+ TransactionLogResource queue1 = mock(TransactionLogResource.class);
+ when(queue1.getId()).thenReturn(queueId1);
+ EnqueableMessage message1 = mock(EnqueableMessage.class);
+ when(message1.isPersistent()).thenReturn(true);
+ when(message1.getMessageNumber()).thenReturn(messageNumber);
+ when(message1.getStoredMessage()).thenReturn(new MockStoredMessage(messageNumber));
+ Record enqueueRecord = new TestRecord(queue1, message1);
+ return enqueueRecord;
+ }
+
+ private static class TestRecord implements Record
+ {
+ private TransactionLogResource _queue;
+ private EnqueableMessage _message;
+
+ public TestRecord(TransactionLogResource queue, EnqueableMessage message)
+ {
+ super();
+ _queue = queue;
+ _message = message;
+ }
+
+ @Override
+ public TransactionLogResource getQueue()
+ {
+ return _queue;
+ }
+
+ @Override
+ public EnqueableMessage getMessage()
+ {
+ return _message;
+ }
+
+ @Override
+ public int hashCode()
+ {
+ final int prime = 31;
+ int result = 1;
+ result = prime * result + ((_message == null) ? 0 : new Long(_message.getMessageNumber()).hashCode());
+ result = prime * result + ((_queue == null) ? 0 : _queue.getId().hashCode());
+ return result;
+ }
+
+ @Override
+ public boolean equals(Object obj)
+ {
+ if (this == obj)
+ {
+ return true;
+ }
+ if (obj == null)
+ {
+ return false;
+ }
+ if (!(obj instanceof Record))
+ {
+ return false;
+ }
+ Record other = (Record) obj;
+ if (_message == null && other.getMessage() != null)
+ {
+ return false;
+ }
+ if (_queue == null && other.getQueue() != null)
+ {
+ return false;
+ }
+ if (_message.getMessageNumber() != other.getMessage().getMessageNumber())
+ {
+ return false;
+ }
+ return _queue.getId().equals(other.getQueue().getId());
+ }
+
+ }
+}
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
index c589bd108b..3fb0776083 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/store/MessageStoreTest.java
@@ -39,6 +39,7 @@ import org.apache.qpid.server.exchange.ExchangeType;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.message.AMQMessage;
import org.apache.qpid.server.message.MessageMetaData;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.queue.AMQPriorityQueue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.AMQQueueFactory;
@@ -58,6 +59,7 @@ import java.io.File;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.UUID;
/**
* This tests the MessageStores by using the available interfaces.
@@ -739,7 +741,7 @@ public class MessageStoreTest extends InternalBrokerBaseCase
try
{
- exchange = type.newInstance(getVirtualHost(), name, durable, 0, false);
+ exchange = type.newInstance(UUIDGenerator.generateUUID(), getVirtualHost(), name, durable, 0, false);
}
catch (AMQException e)
{
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
index 5a11a7aa32..8a34e92985 100644
--- a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/InternalBrokerBaseCase.java
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.server.util;
+import java.util.UUID;
+
import org.apache.commons.configuration.XMLConfiguration;
import org.apache.qpid.AMQException;
diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java
new file mode 100644
index 0000000000..56567523df
--- /dev/null
+++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/util/MapJsonSerializerTest.java
@@ -0,0 +1,53 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.util;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import junit.framework.TestCase;
+
+public class MapJsonSerializerTest extends TestCase
+{
+ private MapJsonSerializer _serializer;
+
+ protected void setUp() throws Exception
+ {
+ super.setUp();
+ _serializer = new MapJsonSerializer();
+
+ }
+
+ public void testSerializeDeserialize()
+ {
+ Map<String, Object> testMap = new HashMap<String, Object>();
+ testMap.put("string", "Test String");
+ testMap.put("integer", new Integer(10));
+ testMap.put("long", new Long(Long.MAX_VALUE));
+ testMap.put("boolean", Boolean.TRUE);
+
+ String jsonString = _serializer.serialize(testMap);
+ Map<String, Object> deserializedMap = _serializer.deserialize(jsonString);
+
+ assertEquals(deserializedMap, testMap);
+ }
+
+}