summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-17 01:07:34 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-17 01:07:34 +0000
commit7177135ca38651943b3701b171ef29e4fa52ad86 (patch)
tree4d221e6969f959b7c0c2a3c0736f71bdf935b459 /qpid/java
parent359bd6e75abf11027b668d33d2d733b4cd399e38 (diff)
downloadqpid-python-7177135ca38651943b3701b171ef29e4fa52ad86.tar.gz
QPID-5709 : [Java Broker] Replace exchange registry / factory with use of common configured object mechanism for registration of children
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588126 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java17
-rw-r--r--qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java12
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java63
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java13
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java162
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java128
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java241
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java18
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java39
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java66
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java77
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java16
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java61
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java48
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java112
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java126
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java159
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java234
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java17
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java41
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java90
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java239
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java79
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java283
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java44
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java10
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java4
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java31
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java41
-rw-r--r--qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory4
-rw-r--r--qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory19
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java10
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java3
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java6
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java18
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java7
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java15
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java5
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java23
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java229
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java21
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java9
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java1
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java77
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java204
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java47
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java198
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java23
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java10
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java1
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java17
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java29
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java18
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java3
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java4
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java39
-rw-r--r--qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java21
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java5
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java39
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java23
-rwxr-xr-xqpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java17
-rw-r--r--qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java13
86 files changed, 1678 insertions, 2226 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
index 87fc10530e..de17acabab 100644
--- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
+++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBHAVirtualHost.java
@@ -127,10 +127,16 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost>
{
_messageStore.getEnvironmentFacade().getEnvironment().flushLog(true);
- ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
- _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
-
- initialiseModel();
+ if(isStoreEmpty())
+ {
+ createDefaultExchanges();
+ }
+ else
+ {
+ ConfiguredObjectRecordHandler upgraderRecoverer =
+ new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+ _messageStore.visitConfiguredObjectRecords(upgraderRecoverer);
+ }
new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
@@ -157,8 +163,7 @@ public class BDBHAVirtualHost extends AbstractVirtualHost<BDBHAVirtualHost>
getConnectionRegistry().close(IConnectionRegistry.VHOST_PASSIVATE_REPLY_TEXT);
removeHouseKeepingTasks();
- getQueueRegistry().stopAllAndUnregisterMBeans();
- getExchangeRegistry().clearAndUnregisterMbeans();
+ getQueueRegistry().close();
getDtxRegistry().close();
finalState = VirtualHostState.PASSIVE;
diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
index 7b1355aa45..3e23df6d87 100644
--- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
+++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/VirtualHostTest.java
@@ -38,6 +38,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
@@ -145,7 +146,7 @@ public class VirtualHostTest extends QpidTestCase
private VirtualHost<?,?,?> createHost(Map<String, Object> attributes)
{
- ConfiguredObjectFactory factory = new ConfiguredObjectFactory(Model.getInstance());
+ ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(Model.getInstance());
ConfiguredObjectTypeFactory vhostFactory =
factory.getConfiguredObjectTypeFactory(VirtualHost.class, attributes);
attributes = new HashMap<String, Object>(attributes);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
index 038667249e..af2315f919 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/Broker.java
@@ -44,6 +44,7 @@ import org.apache.qpid.server.logging.SystemOutMessageLogger;
import org.apache.qpid.server.logging.log4j.LoggingManagementFacade;
import org.apache.qpid.server.logging.messages.BrokerMessages;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
@@ -60,6 +61,7 @@ public class Broker
private volatile IApplicationRegistry _applicationRegistry;
private EventLogger _eventLogger;
private boolean _configuringOwnLogging = false;
+ private final TaskExecutor _taskExecutor = new TaskExecutor();
protected static class InitException extends RuntimeException
{
@@ -85,6 +87,8 @@ public class Broker
{
_applicationRegistry.close();
}
+ _taskExecutor.stop();
+
}
finally
{
@@ -134,10 +138,10 @@ public class Broker
}
LogRecorder logRecorder = new LogRecorder();
- TaskExecutor taskExecutor = new TaskExecutor();
- taskExecutor.start();
- ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance());
- SystemContext systemContext = new SystemContextImpl(taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options);
+
+ _taskExecutor.start();
+ ConfiguredObjectFactory configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ SystemContext systemContext = new SystemContextImpl(_taskExecutor, configuredObjectFactory, _eventLogger, logRecorder, options);
BrokerConfigurationStoreCreator storeCreator = new BrokerConfigurationStoreCreator();
DurableConfigurationStore store = storeCreator.createStore(systemContext, storeType, options.getInitialConfigurationLocation(),
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
index ba8b553b40..634b250d31 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java
@@ -38,8 +38,10 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.StateChangeListener;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class BindingImpl
extends AbstractConfiguredObject<BindingImpl>
@@ -48,8 +50,7 @@ public class BindingImpl
private final String _bindingKey;
private final AMQQueue _queue;
private final ExchangeImpl _exchange;
- private final Map<String, Object> _arguments;
- private final UUID _id;
+ private Map<String, Object> _arguments;
private final AtomicLong _matches = new AtomicLong();
private final BindingLogSubject _logSubject;
@@ -81,7 +82,6 @@ public class BindingImpl
public BindingImpl(UUID id, Map<String, Object> attributes, AMQQueue queue, ExchangeImpl exchange)
{
super(parentsMap(queue,exchange),enhanceWithDurable(combineIdWithAttributes(id, attributes), queue, exchange),queue.getVirtualHost().getTaskExecutor());
- _id = id;
_bindingKey = (String)attributes.get(org.apache.qpid.server.model.Binding.NAME);
_queue = queue;
_exchange = exchange;
@@ -198,7 +198,7 @@ public class BindingImpl
public String toString()
{
- return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + _id + " }";
+ return "Binding{bindingKey="+_bindingKey+", exchange="+_exchange+", queue="+_queue+", id= " + getId() + " }";
}
public void delete()
@@ -229,36 +229,6 @@ public class BindingImpl
}
@Override
- public Object getAttribute(final String name)
- {
- if(ID.equals(name))
- {
- return getId();
- }
- else if(NAME.equals(name))
- {
- return _bindingKey;
- }
- else if(DURABLE.equals(name))
- {
- return isDurable();
- }
- else if(LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if(QUEUE.equals(name))
- {
- return _queue;
- }
- else if(EXCHANGE.equals(name))
- {
- return _exchange;
- }
- return super.getAttribute(name);
- }
-
- @Override
public Object setAttribute(final String name, final Object expected, final Object desired) throws IllegalStateException,
AccessControlException, IllegalArgumentException
{
@@ -276,4 +246,29 @@ public class BindingImpl
{
return _exchange.getEventLogger();
}
+
+ public void setArguments(final Map<String, Object> arguments)
+ {
+ if(getTaskExecutor().isTaskExecutorThread())
+ {
+ _arguments = arguments;
+ super.setAttribute(ARGUMENTS, getActualAttributes().get(ARGUMENTS), arguments);
+ if (isDurable())
+ {
+ VirtualHostImpl<?, ?, ?> vhost = (VirtualHostImpl<?, ?, ?>) _exchange.getParent(VirtualHost.class);
+ vhost.getDurableConfigurationStore().update(true, asObjectRecord());
+ }
+ }
+ else
+ {
+ getTaskExecutor().submitAndWait(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ setArguments(arguments);
+ }
+ });
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
index 43ff07e6d0..4e3601efcc 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/configuration/updater/TaskExecutor.java
@@ -142,6 +142,19 @@ public class TaskExecutor
return future;
}
+ public void submitAndWait(final Runnable task) throws CancellationException
+ {
+ submitAndWait(new Task<Void>()
+ {
+ @Override
+ public Void call()
+ {
+ task.run();
+ return null;
+ }
+ });
+ }
+
public <T> T submitAndWait(Task<T> task) throws CancellationException
{
try
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
index d135d05e64..b1dd6a3721 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java
@@ -37,6 +37,7 @@ import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogSubject;
@@ -54,7 +55,6 @@ import org.apache.qpid.server.model.ManagedAttributeField;
import org.apache.qpid.server.model.Publisher;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
@@ -65,6 +65,7 @@ import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.StateChangeListener;
import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
@@ -111,7 +112,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
super(parentsMap(vhost), attributes, vhost.getTaskExecutor());
_virtualHost = vhost;
// check ACL
- _virtualHost.getSecurityManager().authoriseCreateExchange(this);
+ try
+ {
+ _virtualHost.getSecurityManager().authoriseCreateExchange(this);
+ }
+ catch (AccessControlException e)
+ {
+ deleted();
+ throw e;
+ }
_logSubject = new ExchangeLogSubject(this, this.getVirtualHost());
@@ -129,15 +138,52 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
+ public void validate()
+ {
+ super.validate();
+
+ if(!_virtualHost.getSecurityManager().isSystemProcess())
+ {
+ if (isReservedExchangeName(getName()))
+ {
+ deleted();
+ throw new ReservedExchangeNameException(getName());
+ }
+ }
+ }
+
+ private boolean isReservedExchangeName(String name)
+ {
+ if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
+ || name.startsWith("amq.") || name.startsWith("qpid."))
+ {
+ return true;
+ }
+ return false;
+ }
+
+
+ @Override
protected void onOpen()
{
super.onOpen();
- postSetAlternateExchange();
+
// Log Exchange creation
getEventLogger().message(ExchangeMessages.CREATED(getExchangeType().getType(), getName(), isDurable()));
}
@Override
+ protected void onCreate()
+ {
+ super.onCreate();
+ if(isDurable())
+ {
+ DurableConfigurationStoreHelper.createExchange(getVirtualHost().getDurableConfigurationStore(), this);
+ }
+
+ }
+
+ @Override
public EventLogger getEventLogger()
{
return _virtualHost.getEventLogger();
@@ -156,8 +202,25 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
return getLifetimePolicy() != LifetimePolicy.PERMANENT;
}
- public void close()
+ public void delete()
{
+ _virtualHost.getSecurityManager().authoriseDelete(this);
+
+ if(hasReferrers())
+ {
+ throw new ExchangeIsAlternateException(getName());
+ }
+
+ if(getExchangeType().getDefaultExchangeName().equals( getName() ))
+ {
+ throw new RequiredExchangeException(getName());
+ }
+
+ if (isDurable() && !isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this);
+
+ }
if(_closed.compareAndSet(false,true))
{
@@ -180,7 +243,15 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
task.performAction(this);
}
_closeTaskList.clear();
+
+ if (isDurable() && !isAutoDelete())
+ {
+ DurableConfigurationStoreHelper.removeExchange(getVirtualHost().getDurableConfigurationStore(), this);
+
+ }
}
+ deleted();
+
}
public String toString()
@@ -623,10 +694,7 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
if (id == null)
{
- id = UUIDGenerator.generateBindingUUID(getName(),
- queue.getName(),
- bindingKey,
- _virtualHost.getName());
+ id = UUID.randomUUID();
}
Map<String,Object> attributes = new HashMap<String, Object>();
@@ -636,36 +704,47 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
attributes.put(org.apache.qpid.server.model.Binding.ARGUMENTS, arguments);
}
- BindingImpl b = new BindingImpl(id, attributes, queue, this);
-
- BindingImpl existingMapping = _bindingsMap.putIfAbsent(new BindingIdentifier(bindingKey,queue), b);
- if (existingMapping == null || force)
+ BindingImpl existingMapping;
+ synchronized(this)
{
- b.addStateChangeListener(_bindingListener);
- b.open();
- if (existingMapping != null)
- {
- existingMapping.delete();
- }
+ BindingIdentifier bindingIdentifier = new BindingIdentifier(bindingKey, queue);
+ existingMapping = _bindingsMap.get(bindingIdentifier);
- if (b.isDurable() && !restore)
+ if (existingMapping == null)
{
- DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
- }
+ BindingImpl b = new BindingImpl(id, attributes, queue, this);
+ b.addStateChangeListener(_bindingListener);
+ b.open();
- queue.addBinding(b);
- childAdded(b);
+ if (b.isDurable() && !restore)
+ {
+ DurableConfigurationStoreHelper.createBinding(_virtualHost.getDurableConfigurationStore(), b);
+ }
+ _bindingsMap.put(bindingIdentifier, b);
+ queue.addBinding(b);
+ childAdded(b);
- doAddBinding(b);
+ doAddBinding(b);
- return true;
- }
- else
- {
- return false;
+ return true;
+ }
+ else if(force)
+ {
+ Map<String,Object> oldArguments = existingMapping.getArguments();
+ existingMapping.setArguments(arguments);
+ onBindingUpdated(existingMapping, oldArguments);
+ return true;
+ }
+ else
+ {
+ return false;
+ }
}
}
+ protected abstract void onBindingUpdated(final BindingImpl binding,
+ final Map<String, Object> oldArguments);
+
@Override
protected boolean setState(final State currentState, final State desiredState)
{
@@ -803,37 +882,12 @@ public abstract class AbstractExchange<T extends AbstractExchange<T>>
}
@Override
- public void delete()
- {
- try
- {
- _virtualHost.removeExchange(this,true);
- }
- catch (ExchangeIsAlternateException e)
- {
- throw new UnsupportedOperationException(e.getMessage(),e);
- }
- catch (RequiredExchangeException e)
- {
- throw new UnsupportedOperationException("'"+e.getMessage()+"' is a reserved exchange and can't be deleted",e);
- }
- }
-
- @Override
public Object getAttribute(final String name)
{
if(ConfiguredObject.STATE.equals(name))
{
return getState();
}
- else if(LIFETIME_POLICY.equals(name))
- {
- return getLifetimePolicy();
- }
- else if(DURABLE.equals(name))
- {
- return isDurable();
- }
return super.getAttribute(name);
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
deleted file mode 100644
index 4dc9811934..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.plugin.QpidServiceLoader;
-import org.apache.qpid.server.util.MapValueConverter;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class DefaultExchangeFactory implements ExchangeFactory
-{
- public static final String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
-
- private static final Logger LOGGER = Logger.getLogger(DefaultExchangeFactory.class);
-
- private static final String[] BASE_EXCHANGE_TYPES =
- new String[]{ExchangeDefaults.DIRECT_EXCHANGE_CLASS,
- ExchangeDefaults.FANOUT_EXCHANGE_CLASS,
- ExchangeDefaults.HEADERS_EXCHANGE_CLASS,
- ExchangeDefaults.TOPIC_EXCHANGE_CLASS};
-
- private final VirtualHostImpl _host;
- private Map<String, ExchangeType<? extends ExchangeImpl>> _exchangeClassMap = new HashMap<String, ExchangeType<? extends ExchangeImpl>>();
-
- public DefaultExchangeFactory(VirtualHostImpl host)
- {
- _host = host;
-
- @SuppressWarnings("rawtypes")
- Iterable<ExchangeType> exchangeTypes = loadExchangeTypes();
- for (ExchangeType<?> exchangeType : exchangeTypes)
- {
- String typeName = exchangeType.getType();
-
- if(LOGGER.isDebugEnabled())
- {
- LOGGER.debug("Registering exchange type '" + typeName + "' using class '" + exchangeType.getClass().getName() + "'");
- }
-
- if(_exchangeClassMap.containsKey(typeName))
- {
- ExchangeType<?> existingType = _exchangeClassMap.get(typeName);
-
- throw new IllegalStateException("ExchangeType with type name '" + typeName + "' is already registered using class '"
- + existingType.getClass().getName() + "', can not register class '"
- + exchangeType.getClass().getName() + "'");
- }
-
- _exchangeClassMap.put(typeName, exchangeType);
- }
-
- for(String type : BASE_EXCHANGE_TYPES)
- {
- if(!_exchangeClassMap.containsKey(type))
- {
- throw new IllegalStateException("Did not find expected exchange type: " + type);
- }
- }
- }
-
- @SuppressWarnings("rawtypes")
- protected Iterable<ExchangeType> loadExchangeTypes()
- {
- return new QpidServiceLoader<ExchangeType>().atLeastOneInstanceOf(ExchangeType.class);
- }
-
- public Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes()
- {
- return _exchangeClassMap.values();
- }
-
- @Override
- public ExchangeImpl createExchange(final Map<String, Object> attributes)
- throws AMQUnknownExchangeType, UnknownExchangeException
- {
- ExchangeImpl exchange = createExchangeInstance(attributes);
- ((AbstractExchange)exchange).create();
- return exchange;
- }
-
- private ExchangeImpl createExchangeInstance(final Map<String, Object> attributes)
- {
- String type = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.TYPE, attributes);
- ExchangeType<? extends ExchangeImpl> exchType = _exchangeClassMap.get(type);
- if (exchType == null)
- {
- throw new AMQUnknownExchangeType("Unknown exchange type: " + type,null);
- }
- return exchType.newInstance(_host, attributes);
-
- }
-
- @Override
- public ExchangeImpl restoreExchange(Map<String,Object> attributes)
- throws AMQUnknownExchangeType, UnknownExchangeException
- {
- ExchangeImpl exchange = createExchangeInstance(attributes);
- exchange.open();
- return exchange;
-
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
deleted file mode 100644
index a79601bced..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeRegistry.java
+++ /dev/null
@@ -1,241 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-
-import org.apache.log4j.Logger;
-
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.server.message.MessageDestination;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.queue.QueueRegistry;
-import org.apache.qpid.server.store.DurableConfigurationStore;
-import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-
-public class DefaultExchangeRegistry implements ExchangeRegistry
-{
- private static final Logger LOGGER = Logger.getLogger(DefaultExchangeRegistry.class);
- /**
- * Maps from exchange name to exchange instance
- */
- private ConcurrentMap<String, ExchangeImpl<?>> _exchangeMap = new ConcurrentHashMap<String, ExchangeImpl<?>>();
-
- private MessageDestination _defaultExchange;
-
- private final VirtualHostImpl _host;
- private final QueueRegistry _queueRegistry;
-
- private final Collection<RegistryChangeListener> _listeners =
- Collections.synchronizedCollection(new ArrayList<RegistryChangeListener>());
-
- public DefaultExchangeRegistry(VirtualHostImpl host, QueueRegistry queueRegistry)
- {
- _host = host;
- _queueRegistry = queueRegistry;
- }
-
- public void initialise(ExchangeFactory exchangeFactory)
- {
- //create 'standard' exchanges:
- initialiseExchanges(exchangeFactory, getDurableConfigurationStore());
-
- _defaultExchange =
- new DefaultDestination(_host
- );
-
-
- }
-
- private void initialiseExchanges(ExchangeFactory factory, DurableConfigurationStore store)
- {
- for (ExchangeType<? extends ExchangeImpl> type : factory.getRegisteredTypes())
- {
- defineExchange(factory, type.getDefaultExchangeName(), type.getType(), store);
- }
-
- }
-
- private void defineExchange(ExchangeFactory f, String name, String type, DurableConfigurationStore store)
- {
- try
- {
- if(getExchange(name) == null)
- {
- Map<String, Object> attributes = new HashMap<String, Object>();
- attributes.put(org.apache.qpid.server.model.Exchange.ID,
- UUIDGenerator.generateExchangeUUID(name, _host.getName()));
- attributes.put(org.apache.qpid.server.model.Exchange.NAME, name);
- attributes.put(org.apache.qpid.server.model.Exchange.TYPE, type);
- attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- ExchangeImpl<?> exchange = f.createExchange(attributes);
- exchange.open();
- registerExchange(exchange);
- if(exchange.isDurable())
- {
- DurableConfigurationStoreHelper.createExchange(store, exchange);
- }
- }
- }
- catch (AMQUnknownExchangeType e)
- {
- throw new ServerScopedRuntimeException("Unknown exchange type while attempting to initialise exchanges - " +
- "this is because necessary jar files are not on the classpath", e);
- }
- catch (UnknownExchangeException e)
- {
- throw new ServerScopedRuntimeException("Unknown alternate exchange type while attempting to initialise " +
- "a mandatory exchange which should not have an alternate: '" +
- name + "'");
- }
- }
-
- public DurableConfigurationStore getDurableConfigurationStore()
- {
- return _host.getDurableConfigurationStore();
- }
-
- public void registerExchange(ExchangeImpl exchange)
- {
- _exchangeMap.put(exchange.getName(), exchange);
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.exchangeRegistered(exchange);
- }
-
- }
- }
-
- public MessageDestination getDefaultExchange()
- {
- return _defaultExchange;
- }
-
- public boolean unregisterExchange(String name, boolean inUse)
- {
- final ExchangeImpl exchange = _exchangeMap.get(name);
- if (exchange != null)
- {
-
- _host.getSecurityManager().authoriseDelete(exchange);
-
- // TODO: check inUse argument
-
- ExchangeImpl e = _exchangeMap.remove(name);
- // if it is null then it was removed by another thread at the same time, we can ignore
- if (e != null)
- {
- e.close();
-
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.exchangeUnregistered(exchange);
- }
- }
-
- }
- }
- return exchange != null;
-
- }
-
- public Collection<ExchangeImpl<?>> getExchanges()
- {
- return new ArrayList<ExchangeImpl<?>>(_exchangeMap.values());
- }
-
- public void addRegistryChangeListener(RegistryChangeListener listener)
- {
- _listeners.add(listener);
- }
-
- public ExchangeImpl<?> getExchange(String name)
- {
- return name == null ? null : _exchangeMap.get(name);
- }
-
- @Override
- public void clearAndUnregisterMbeans()
- {
- for (final ExchangeImpl<?> exchange : getExchanges())
- {
- //TODO: this is a bit of a hack, what if the listeners aren't aware
- //that we are just unregistering the MBean because of HA, and aren't
- //actually removing the exchange as such.
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.exchangeUnregistered(exchange);
- }
- }
- }
- _exchangeMap.clear();
- }
-
- @Override
- public synchronized ExchangeImpl<?> getExchange(UUID exchangeId)
- {
- Collection<ExchangeImpl<?>> exchanges = _exchangeMap.values();
- for (ExchangeImpl<?> exchange : exchanges)
- {
- if (exchange.getId().equals(exchangeId))
- {
- return exchange;
- }
- }
- return null;
-
- }
-
- public boolean isReservedExchangeName(String name)
- {
- if (name == null || ExchangeDefaults.DEFAULT_EXCHANGE_NAME.equals(name)
- || name.startsWith("amq.") || name.startsWith("qpid."))
- {
- return true;
- }
- Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = _host.getExchangeTypes();
- for (ExchangeType<? extends ExchangeImpl> type : registeredTypes)
- {
- if (type.getDefaultExchangeName().equals(name))
- {
- return true;
- }
- }
- return false;
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
index 671cbbe7e7..7b011f6cc1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java
@@ -70,6 +70,11 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
recalculateQueues();
}
+ public synchronized void updateBinding(BindingImpl binding)
+ {
+ recalculateQueues();
+ }
+
private void recalculateQueues()
{
List<BaseQueue> queues = new ArrayList<BaseQueue>(_bindings.size());
@@ -188,6 +193,19 @@ public class DirectExchange extends AbstractExchange<DirectExchange>
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getAMQQueue();
+
+ assert queue != null;
+ assert bindingKey != null;
+
+ BindingSet bindings = _bindingsByKey.get(bindingKey);
+ bindings.updateBinding(binding);
+ }
+
protected void onBind(final BindingImpl binding)
{
String bindingKey = binding.getBindingKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java
new file mode 100644
index 0000000000..c5deb7e3b3
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchangeFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class DirectExchangeFactory extends AbstractConfiguredObjectTypeFactory<DirectExchange>
+{
+ public DirectExchangeFactory()
+ {
+ super(DirectExchange.class);
+ }
+
+ @Override
+ public DirectExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new DirectExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
deleted file mode 100644
index 2731f665ac..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeFactory.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.virtualhost.UnknownExchangeException;
-
-import java.util.Collection;
-import java.util.Map;
-
-
-public interface ExchangeFactory
-{
-
- Collection<ExchangeType<? extends ExchangeImpl>> getRegisteredTypes();
-
- ExchangeImpl createExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
-
- ExchangeImpl restoreExchange(Map<String,Object> attributes) throws AMQUnknownExchangeType, UnknownExchangeException;
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
index ff63a83292..57929b7306 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeImpl.java
@@ -62,7 +62,7 @@ public interface ExchangeImpl<T extends ExchangeImpl<T>> extends Exchange<T>, Ex
void restoreBinding(UUID id, String bindingKey, AMQQueue queue,
Map<String, Object> argumentMap);
- void close();
+ void delete();
/**
* Determines whether a message would be isBound to a particular queue using a specific routing key and arguments
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
deleted file mode 100644
index 7fa7a64a62..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/ExchangeRegistry.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import org.apache.qpid.server.message.MessageDestination;
-
-import java.util.Collection;
-import java.util.UUID;
-
-
-public interface ExchangeRegistry
-{
- void registerExchange(ExchangeImpl<?> exchange);
-
- MessageDestination getDefaultExchange();
-
- void initialise(ExchangeFactory exchangeFactory);
-
- ExchangeImpl<?> getExchange(String exchangeName);
-
- /**
- * Unregister an exchange
- * @param exchange name of the exchange to delete
- * @param ifUnused if true, do NOT delete the exchange if it is in use (has queues bound to it)
- */
- boolean unregisterExchange(String exchange, boolean ifUnused);
-
- void clearAndUnregisterMbeans();
-
- ExchangeImpl<?> getExchange(UUID exchangeId);
-
- Collection<ExchangeImpl<?>> getExchanges();
-
- void addRegistryChangeListener(RegistryChangeListener listener);
-
- /**
- * Validates the name of user custom exchange.
- * <p>
- * Return true if the exchange name is reserved and false otherwise.
- */
- boolean isReservedExchangeName(String name);
-
- interface RegistryChangeListener
- {
- void exchangeRegistered(ExchangeImpl exchange);
- void exchangeUnregistered(ExchangeImpl exchange);
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
index 1484480c5d..97c2ebe5e3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java
@@ -125,6 +125,80 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
}
+ @Override
+ protected synchronized void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ AMQQueue queue = binding.getAMQQueue();
+
+ if (binding.getArguments() == null || binding.getArguments().isEmpty() || !FilterSupport.argumentsContainFilter(
+ binding.getArguments()))
+ {
+ if(oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ {
+ _unfilteredQueues.add(queue);
+ if(_queues.containsKey(queue))
+ {
+ _queues.put(queue,_queues.get(queue)+1);
+ }
+ else
+ {
+ _queues.put(queue, ONE);
+ }
+
+ // No longer any reason to check filters for this queue
+ _filteredQueues.remove(queue);
+ }
+ // else - nothing has changed, remains unfiltered
+ }
+ else
+ {
+ HashMap<AMQQueue,Map<BindingImpl, MessageFilter>> filteredBindings =
+ new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
+
+ Map<BindingImpl,MessageFilter> bindingsForQueue;
+
+ final MessageFilter messageFilter;
+
+ try
+ {
+ messageFilter = FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ _logger.warn("Cannot bind queue " + queue + " to exchange this " + this + " because selector cannot be parsed.", e);
+ return;
+ }
+
+
+ if (oldArguments != null && !oldArguments.isEmpty() && FilterSupport.argumentsContainFilter(oldArguments))
+ {
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>(filteredBindings.remove(binding.getAMQQueue()));
+ }
+ else // previously unfiltered
+ {
+ bindingsForQueue = new HashMap<BindingImpl,MessageFilter>();
+
+ Integer oldValue = _queues.remove(queue);
+ if (ONE.equals(oldValue))
+ {
+ // should start checking filters for this queue
+ _filteredQueues.add(queue);
+ _unfilteredQueues.remove(queue);
+ }
+ else
+ {
+ _queues.put(queue, oldValue - 1);
+ }
+
+ }
+ bindingsForQueue.put(binding, messageFilter);
+ filteredBindings.put(binding.getAMQQueue(),bindingsForQueue);
+
+ _filteredBindings.set(filteredBindings);
+
+ }
+
+ }
protected synchronized void onBind(final BindingImpl binding)
{
@@ -156,8 +230,7 @@ public class FanoutExchange extends AbstractExchange<FanoutExchange>
new HashMap<AMQQueue,Map<BindingImpl, MessageFilter>>(_filteredBindings.get());
Map<BindingImpl, MessageFilter> bindingsForQueue = filteredBindings.remove(binding.getAMQQueue());
- final
- MessageFilter messageFilter =
+ final MessageFilter messageFilter =
FilterSupport.createMessageFilter(binding.getArguments(), binding.getAMQQueue());
if(bindingsForQueue != null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java
new file mode 100644
index 0000000000..5884776778
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchangeFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class FanoutExchangeFactory extends AbstractConfiguredObjectTypeFactory<FanoutExchange>
+{
+ public FanoutExchangeFactory()
+ {
+ super(FanoutExchange.class);
+ }
+
+ @Override
+ public FanoutExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new FanoutExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
index 6aa32c8528..10dd33dee1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java
@@ -22,6 +22,7 @@ package org.apache.qpid.server.exchange;
import java.util.ArrayList;
import java.util.LinkedHashSet;
+import java.util.ListIterator;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
@@ -158,6 +159,21 @@ public class HeadersExchange extends AbstractExchange<HeadersExchange>
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ HeadersBinding headersBinding = new HeadersBinding(binding);
+ ListIterator<HeadersBinding> iter = _bindingHeaderMatchers.listIterator();
+ while(iter.hasNext())
+ {
+ if(iter.next().equals(headersBinding))
+ {
+ iter.set(headersBinding);
+ }
+ }
+
+ }
+
protected void onUnbind(final BindingImpl binding)
{
assert binding != null;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java
new file mode 100644
index 0000000000..b3f9834889
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class HeadersExchangeFactory extends AbstractConfiguredObjectTypeFactory<HeadersExchange>
+{
+ public HeadersExchangeFactory()
+ {
+ super(HeadersExchange.class);
+ }
+
+ @Override
+ public HeadersExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new HeadersExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
index 992d9714cb..683b7b1aa4 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java
@@ -72,6 +72,67 @@ public class TopicExchange extends AbstractExchange<TopicExchange>
return TYPE;
}
+ @Override
+ protected void onBindingUpdated(final BindingImpl binding, final Map<String, Object> oldArguments)
+ {
+ final String bindingKey = binding.getBindingKey();
+ AMQQueue queue = binding.getAMQQueue();
+ Map<String,Object> args = binding.getArguments();
+
+ assert queue != null;
+ assert bindingKey != null;
+
+ _logger.debug("Registering queue " + queue.getName() + " with routing key " + bindingKey);
+
+
+ String routingKey = TopicNormalizer.normalize(bindingKey);
+
+ try
+ {
+
+ if (_bindings.containsKey(binding))
+ {
+ Map<String, Object> oldArgs = _bindings.get(binding);
+ TopicExchangeResult result = _topicExchangeResults.get(routingKey);
+
+ if (FilterSupport.argumentsContainFilter(args))
+ {
+ if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.replaceQueueFilter(queue,
+ FilterSupport.createMessageFilter(oldArgs, queue),
+ FilterSupport.createMessageFilter(args, queue));
+ }
+ else
+ {
+ result.addFilteredQueue(queue, FilterSupport.createMessageFilter(args, queue));
+ result.removeUnfilteredQueue(queue);
+ }
+ }
+ else
+ {
+ if (FilterSupport.argumentsContainFilter(oldArgs))
+ {
+ result.addUnfilteredQueue(queue);
+ result.removeFilteredQueue(queue, FilterSupport.createMessageFilter(oldArgs, queue));
+ }
+ else
+ {
+ // TODO - fix control flow
+ return;
+ }
+ }
+
+ }
+ }
+ catch (AMQInvalidArgumentException e)
+ {
+ throw new ConnectionScopedRuntimeException(e);
+ }
+
+
+ }
+
protected synchronized void registerQueue(final BindingImpl binding) throws AMQInvalidArgumentException
{
final String bindingKey = binding.getBindingKey();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java
new file mode 100644
index 0000000000..550377a02f
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchangeFactory.java
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.exchange;
+
+import java.util.Map;
+
+import org.apache.qpid.server.model.AbstractConfiguredObjectTypeFactory;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
+
+public class TopicExchangeFactory extends AbstractConfiguredObjectTypeFactory<TopicExchange>
+{
+ public TopicExchangeFactory()
+ {
+ super(TopicExchange.class);
+ }
+
+ @Override
+ public TopicExchange createInstance(Map<String, Object> attributes, ConfiguredObject<?>... parents)
+ {
+ VirtualHost<?,?,?> virtualHost = getParent(VirtualHost.class, parents);
+ if (!(virtualHost instanceof VirtualHostImpl))
+ {
+ throw new IllegalArgumentException("Unexpected virtual host is set as a parent. Expected instance of " + VirtualHostImpl.class.getName());
+ }
+ return new TopicExchange((VirtualHostImpl<?, ?, ?>)virtualHost , attributes);
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index e11550d575..a04635ec74 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -41,6 +41,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -104,7 +105,12 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
new ArrayList<ConfigurationChangeListener>();
private final Map<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>> _children =
- new HashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>();
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Collection<ConfiguredObject<?>>>();
+ private final Map<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>> _childrenById =
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<UUID,ConfiguredObject<?>>>();
+ private final Map<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>> _childrenByName =
+ new ConcurrentHashMap<Class<? extends ConfiguredObject>, Map<String,ConfiguredObject<?>>>();
+
@ManagedAttributeField
private final UUID _id;
@@ -178,6 +184,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
if(idObj == null)
{
uuid = UUID.randomUUID();
+ attributes = new HashMap<String, Object>(attributes);
+ attributes.put(ID, uuid);
}
else
{
@@ -185,6 +193,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
_id = uuid;
+ _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this);
_attributeTypes = getAttributeTypes(getClass());
_automatedFields = getAutomatedFields(getClass());
@@ -205,6 +214,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
for (Class<? extends ConfiguredObject> childClass : Model.getInstance().getChildTypes(getCategoryClass()))
{
_children.put(childClass, new CopyOnWriteArrayList<ConfiguredObject<?>>());
+ _childrenById.put(childClass, new ConcurrentHashMap<UUID, ConfiguredObject<?>>());
+ _childrenByName.put(childClass, new ConcurrentHashMap<String, ConfiguredObject<?>>());
}
for(ConfiguredObject<?> parent : parents.values())
@@ -220,7 +231,6 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
addParent((Class<ConfiguredObject<?>>) entry.getKey(), entry.getValue());
}
- _name = AttributeValueConverter.STRING_CONVERTER.convert(attributes.get(NAME),this);
Object durableObj = attributes.get(DURABLE);
_durable = AttributeValueConverter.BOOLEAN_CONVERTER.convert(durableObj == null ? _attributeTypes.get(DURABLE).getAnnotation().defaultValue() : durableObj, this);
@@ -353,7 +363,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- public void open()
+ public final void open()
{
if(_open.compareAndSet(false,true))
{
@@ -364,7 +374,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
- public void create()
+ public final void create()
{
if(_open.compareAndSet(false,true))
{
@@ -391,7 +401,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
});
}
- protected void doValidation()
+ protected final void doValidation()
{
applyToChildren(new Action<ConfiguredObject<?>>()
{
@@ -407,7 +417,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
validate();
}
- protected void doResolution()
+ protected final void doResolution()
{
resolve();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -423,7 +433,7 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
});
}
- protected void doCreation()
+ protected final void doCreation()
{
onCreate();
applyToChildren(new Action<ConfiguredObject<?>>()
@@ -454,7 +464,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
}
}
- public void validate()
+ public void
+ validate()
{
}
@@ -868,29 +879,78 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
private <C extends ConfiguredObject> void registerChild(final C child)
{
+
Class categoryClass = child.getCategoryClass();
+ UUID childId = child.getId();
+ String name = child.getName();
+ if(_childrenById.get(categoryClass).containsKey(childId))
+ {
+ throw new DuplicateIdException(child);
+ }
+ if(_childrenByName.get(categoryClass).containsKey(name))
+ {
+ Collection<Class<? extends ConfiguredObject>> parentTypes =
+ new ArrayList<Class<? extends ConfiguredObject>>(Model.getInstance().getParentTypes(categoryClass));
+ parentTypes.remove(getCategoryClass());
+ boolean duplicate = true;
+
+ C existing = (C) _childrenByName.get(categoryClass).get(name);
+ for(Class<? extends ConfiguredObject> parentType : parentTypes)
+ {
+ ConfiguredObject existingParent = existing.getParent(parentType);
+ ConfiguredObject childParent = child.getParent(parentType);
+ duplicate = existingParent == childParent;
+ if(!duplicate)
+ {
+ break;
+ }
+ }
+
+ if(duplicate)
+ {
+ throw new DuplicateNameException(child);
+ }
+ }
_children.get(categoryClass).add(child);
+ _childrenById.get(categoryClass).put(childId,child);
+ _childrenByName.get(categoryClass).put(name, child);
+
}
protected void deleted()
{
- for(ConfiguredObject<?> parent : _parents.values())
+ for (ConfiguredObject<?> parent : _parents.values())
{
- if(parent instanceof AbstractConfiguredObject<?>)
+ if (parent instanceof AbstractConfiguredObject<?>)
{
- ((AbstractConfiguredObject<?>)parent).unregisterChild(this);
+ AbstractConfiguredObject<?> parentObj = (AbstractConfiguredObject<?>) parent;
+ parentObj.unregisterChild(this);
+ parentObj.childRemoved(this);
}
}
}
- protected <C extends ConfiguredObject> void unregisterChild(final C child)
+ private <C extends ConfiguredObject> void unregisterChild(final C child)
{
- _children.get(child.getCategoryClass()).remove(child);
+ Class categoryClass = child.getCategoryClass();
+ _children.get(categoryClass).remove(child);
+ _childrenById.get(categoryClass).remove(child.getId());
+ _childrenByName.get(categoryClass).remove(child.getName());
}
+ @Override
+ public final <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id)
+ {
+ return (C) _childrenById.get(Model.getCategory(clazz)).get(id);
+ }
+ @Override
+ public final <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name)
+ {
+ return (C) _childrenByName.get(Model.getCategory(clazz)).get(name);
+ }
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(final Class<C> clazz)
@@ -1581,7 +1641,8 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
int oldSize = 0;
Model model = Model.getInstance();
- Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes(candidate));
+ Set<Class<? extends ConfiguredObject>> allDescendants = new HashSet<Class<? extends ConfiguredObject>>(model.getChildTypes(
+ candidate));
while(allDescendants.size() > oldSize)
{
oldSize = allDescendants.size();
@@ -1661,4 +1722,27 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
throw new ServerScopedRuntimeException("Unable to find attribute definition for method " + method.getName());
}
}
+
+ protected final static class DuplicateIdException extends RuntimeException
+ {
+ public DuplicateIdException(final ConfiguredObject<?> child)
+ {
+ super("Child of type " + child.getClass().getSimpleName() + " already exists with id of " + child.getId());
+ }
+ }
+
+ protected final static class DuplicateNameException extends RuntimeException
+ {
+ private final String _name;
+ public DuplicateNameException(final ConfiguredObject<?> child)
+ {
+ super("Child of type " + child.getClass().getSimpleName() + " already exists with name of " + child.getName());
+ _name = child.getName();
+ }
+
+ public String getName()
+ {
+ return _name;
+ }
+ }
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
index 5285a7b611..dbf138b8a2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Broker.java
@@ -194,4 +194,6 @@ public interface Broker<X extends Broker<X>> extends ConfiguredObject<X>, EventL
void setEventLogger(EventLogger eventLogger);
AuthenticationProvider<?> getManagementModeAuthenticationProvider();
+
+ ConfiguredObjectFactory getObjectFactory();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 5480b1f415..54bab071fd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -233,6 +233,10 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
*/
<C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz);
+ <C extends ConfiguredObject> C getChildById(Class<C> clazz, UUID id);
+
+ <C extends ConfiguredObject> C getChildByName(Class<C> clazz, String name);
+
<C extends ConfiguredObject> C createChild(Class<C> childClass,
Map<String, Object> attributes,
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
index 1c5afa6175..35d7df4385 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactory.java
@@ -20,133 +20,25 @@
*/
package org.apache.qpid.server.model;
-import java.util.ArrayList;
import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
import java.util.Map;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
-import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.UnresolvedConfiguredObject;
-import org.apache.qpid.server.util.ServerScopedRuntimeException;
-public class ConfiguredObjectFactory
+public interface ConfiguredObjectFactory
{
- private final Map<String, String> _defaultTypes = new HashMap<String, String>();
- private final Map<String, Map<String, ConfiguredObjectTypeFactory>> _allFactories =
- new HashMap<String, Map<String, ConfiguredObjectTypeFactory>>();
- private final Map<String, Collection<String>> _supportedTypes = new HashMap<String, Collection<String>>();
+ <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record,
+ ConfiguredObject<?>... parents);
- private final Model _model;
+ <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(Class<X> categoryClass,
+ Map<String, Object> attributes);
- public ConfiguredObjectFactory(Model model)
- {
- _model = model;
+ <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(String category,
+ String type);
- QpidServiceLoader<ConfiguredObjectTypeFactory> serviceLoader = new QpidServiceLoader<ConfiguredObjectTypeFactory>();
- Iterable<ConfiguredObjectTypeFactory> allFactories = serviceLoader.instancesOf(ConfiguredObjectTypeFactory.class);
- for(ConfiguredObjectTypeFactory factory : allFactories)
- {
- final Class<? extends ConfiguredObject> categoryClass = factory.getCategoryClass();
- final String categoryName = categoryClass.getSimpleName();
-
- Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(categoryName);
- if(categoryFactories == null)
- {
- categoryFactories = new HashMap<String, ConfiguredObjectTypeFactory>();
- _allFactories.put(categoryName, categoryFactories);
- _supportedTypes.put(categoryName, new ArrayList<String>());
- ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class);
- if(annotation != null && !"".equals(annotation.defaultType()))
- {
- _defaultTypes.put(categoryName, annotation.defaultType());
- }
-
- }
- if(categoryFactories.put(factory.getType(),factory) != null)
- {
- throw new ServerScopedRuntimeException("Misconfiguration - there is more than one factory defined for class " + categoryName
- + " with type " + factory.getType());
- }
- if(factory.getType() != null)
- {
- _supportedTypes.get(categoryName).add(factory.getType());
- }
- }
- }
-
- public <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record,
- ConfiguredObject<?>... parents)
- {
- String category = record.getType();
-
-
- String type = (String) record.getAttributes().get(ConfiguredObject.TYPE);
-
- ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(category, type);
-
- if(factory == null)
- {
- throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category + " and type " + type);
- }
-
- return factory.recover(record, parents);
- }
-
- public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass, Map<String,Object> attributes)
- {
- final String category = categoryClass.getSimpleName();
- Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
- if(categoryFactories == null)
- {
- throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category);
- }
- String type = (String) attributes.get(ConfiguredObject.TYPE);
-
- ConfiguredObjectTypeFactory<X> factory;
-
- if(type != null)
- {
- factory = getConfiguredObjectTypeFactory(category, type);
- }
- else
- {
- factory = getConfiguredObjectTypeFactory(category, null);
- if(factory == null)
- {
- ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class);
- factory = getConfiguredObjectTypeFactory(category, annotation.defaultType());
- }
- }
- return factory;
- }
-
- public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category, final String type)
- {
- Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
- if(categoryFactories == null)
- {
- throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category);
- }
- ConfiguredObjectTypeFactory factory = categoryFactories.get(type);
- if(factory == null)
- {
- factory = categoryFactories.get(_defaultTypes.get(category));
- }
- return factory;
- }
-
- public Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category)
- {
- return Collections.unmodifiableCollection(_supportedTypes.get(category.getSimpleName()));
- }
-
-
- public Model getModel()
- {
- return _model;
- }
+ Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category);
+ Model getModel();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
new file mode 100644
index 0000000000..57062cb7a2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObjectFactoryImpl.java
@@ -0,0 +1,159 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.model;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
+import org.apache.qpid.server.util.ServerScopedRuntimeException;
+
+public class ConfiguredObjectFactoryImpl implements ConfiguredObjectFactory
+{
+ private final Map<String, String> _defaultTypes = new HashMap<String, String>();
+ private final Map<String, Map<String, ConfiguredObjectTypeFactory>> _allFactories =
+ new HashMap<String, Map<String, ConfiguredObjectTypeFactory>>();
+ private final Map<String, Collection<String>> _supportedTypes = new HashMap<String, Collection<String>>();
+
+ private final Model _model;
+
+ public ConfiguredObjectFactoryImpl(Model model)
+ {
+ _model = model;
+
+ QpidServiceLoader<ConfiguredObjectTypeFactory> serviceLoader = new QpidServiceLoader<ConfiguredObjectTypeFactory>();
+ Iterable<ConfiguredObjectTypeFactory> allFactories = serviceLoader.instancesOf(ConfiguredObjectTypeFactory.class);
+ for(ConfiguredObjectTypeFactory factory : allFactories)
+ {
+ final Class<? extends ConfiguredObject> categoryClass = factory.getCategoryClass();
+ final String categoryName = categoryClass.getSimpleName();
+
+ Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(categoryName);
+ if(categoryFactories == null)
+ {
+ categoryFactories = new HashMap<String, ConfiguredObjectTypeFactory>();
+ _allFactories.put(categoryName, categoryFactories);
+ _supportedTypes.put(categoryName, new ArrayList<String>());
+ ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class);
+ if(annotation != null && !"".equals(annotation.defaultType()))
+ {
+ _defaultTypes.put(categoryName, annotation.defaultType());
+ }
+
+ }
+ if(categoryFactories.put(factory.getType(),factory) != null)
+ {
+ throw new ServerScopedRuntimeException("Misconfiguration - there is more than one factory defined for class " + categoryName
+ + " with type " + factory.getType());
+ }
+ if(factory.getType() != null)
+ {
+ _supportedTypes.get(categoryName).add(factory.getType());
+ }
+ }
+ }
+
+ @Override
+ public <X extends ConfiguredObject<X>> UnresolvedConfiguredObject<X> recover(ConfiguredObjectRecord record,
+ ConfiguredObject<?>... parents)
+ {
+ String category = record.getType();
+
+
+ String type = (String) record.getAttributes().get(ConfiguredObject.TYPE);
+
+ ConfiguredObjectTypeFactory<X> factory = getConfiguredObjectTypeFactory(category, type);
+
+ if(factory == null)
+ {
+ throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category + " and type " + type);
+ }
+
+ return factory.recover(record, parents);
+ }
+
+ @Override
+ public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final Class<X> categoryClass,
+ Map<String, Object> attributes)
+ {
+ final String category = categoryClass.getSimpleName();
+ Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
+ if(categoryFactories == null)
+ {
+ throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category);
+ }
+ String type = (String) attributes.get(ConfiguredObject.TYPE);
+
+ ConfiguredObjectTypeFactory<X> factory;
+
+ if(type != null)
+ {
+ factory = getConfiguredObjectTypeFactory(category, type);
+ }
+ else
+ {
+ factory = getConfiguredObjectTypeFactory(category, null);
+ if(factory == null)
+ {
+ ManagedObject annotation = categoryClass.getAnnotation(ManagedObject.class);
+ factory = getConfiguredObjectTypeFactory(category, annotation.defaultType());
+ }
+ }
+ return factory;
+ }
+
+ @Override
+ public <X extends ConfiguredObject<X>> ConfiguredObjectTypeFactory<X> getConfiguredObjectTypeFactory(final String category,
+ final String type)
+ {
+ Map<String, ConfiguredObjectTypeFactory> categoryFactories = _allFactories.get(category);
+ if(categoryFactories == null)
+ {
+ throw new ServerScopedRuntimeException("No factory defined for ConfiguredObject of category " + category);
+ }
+ ConfiguredObjectTypeFactory factory = categoryFactories.get(type);
+ if(factory == null)
+ {
+ factory = categoryFactories.get(_defaultTypes.get(category));
+ }
+ return factory;
+ }
+
+ @Override
+ public Collection<String> getSupportedTypes(Class<? extends ConfiguredObject> category)
+ {
+ return Collections.unmodifiableCollection(_supportedTypes.get(category.getSimpleName()));
+ }
+
+
+ @Override
+ public Model getModel()
+ {
+ return _model;
+ }
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
index 95cb614d44..0ddc392430 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/Session.java
@@ -56,4 +56,6 @@ public interface Session<X extends Session<X>> extends ConfiguredObject<X>
@ManagedStatistic
long getUnacknowledgedMessages();
+
+ void delete();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
index a9e9225a26..d0b4809303 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/BrokerAdapter.java
@@ -166,11 +166,13 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
String modelVersion = (String) getActualAttributes().get(Broker.MODEL_VERSION);
if (modelVersion == null)
{
+ deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " must be specified");
}
if (!MODEL_VERSION_PATTERN.matcher(modelVersion).matches())
{
+ deleted();
throw new IllegalConfigurationException("Broker " + Broker.MODEL_VERSION + " is specified in incorrect format: "
+ modelVersion);
}
@@ -182,12 +184,14 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
if (majorModelVersion != Model.MODEL_MAJOR_VERSION || minorModelVersion > Model.MODEL_MINOR_VERSION)
{
+ deleted();
throw new IllegalConfigurationException("The model version '" + modelVersion
+ "' in configuration is incompatible with the broker model version '" + Model.MODEL_VERSION + "'");
}
if(!isDurable())
{
+ deleted();
throw new IllegalArgumentException(getClass().getSimpleName() + " must be durable");
}
}
@@ -1203,6 +1207,12 @@ public class BrokerAdapter extends AbstractConfiguredObject<BrokerAdapter> imple
}
@Override
+ public ConfiguredObjectFactory getObjectFactory()
+ {
+ return _objectFactory;
+ }
+
+ @Override
public void setEventLogger(final EventLogger eventLogger)
{
_eventLogger = eventLogger;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
index 5dc47f04aa..047ea4ef97 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProvider.java
@@ -27,6 +27,8 @@ import org.apache.qpid.server.model.ManagedObject;
@ManagedObject( category = false, type = "GroupFile" )
public interface FileBasedGroupProvider<X extends FileBasedGroupProvider<X>> extends GroupProvider<X>
{
+ String PATH="path";
+
@ManagedAttribute( automate = true, mandatory = true)
String getPath();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
index afcfe93618..be14b284b1 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/FileBasedGroupProviderImpl.java
@@ -23,28 +23,49 @@ import java.io.File;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.Principal;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.security.access.Operation;
-import org.apache.qpid.server.security.group.FileGroupManager;
-import org.apache.qpid.server.security.group.GroupManager;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Group;
+import org.apache.qpid.server.model.GroupMember;
+import org.apache.qpid.server.model.GroupProvider;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.security.SecurityManager;
+import org.apache.qpid.server.security.access.Operation;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
+import org.apache.qpid.server.security.group.FileGroupDatabase;
+import org.apache.qpid.server.security.group.GroupPrincipal;
import org.apache.qpid.server.util.MapValueConverter;
public class FileBasedGroupProviderImpl
extends AbstractConfiguredObject<FileBasedGroupProviderImpl> implements FileBasedGroupProvider<FileBasedGroupProviderImpl>
{
+ public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.group.FileGroupProviderAttributeDescriptions";
+ public static final String GROUP_FILE_PROVIDER_TYPE = "GroupFile";
private static Logger LOGGER = Logger.getLogger(FileBasedGroupProviderImpl.class);
- private GroupManager _groupManager;
private final Broker<?> _broker;
private AtomicReference<State> _state;
+ private FileGroupDatabase _groupDatabase;
+
@ManagedAttributeField
private String _path;
@@ -100,18 +121,74 @@ public class FileBasedGroupProviderImpl
protected void onOpen()
{
super.onOpen();
- if(_groupManager == null)
+ if(_groupDatabase == null)
+ {
+ _groupDatabase = new FileGroupDatabase();
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ setState(getState(), State.ERRORED);
+ LOGGER.warn(("Unable to open preferences file at " + _path));
+ }
+ }
+ Set<Principal> groups = getGroupPrincipals();
+ Collection<Group> principals = new ArrayList<Group>(groups.size());
+ for (Principal group : groups)
{
- _groupManager = new FileGroupManager(getPath());
+ Map<String,Object> attrMap = new HashMap<String, Object>();
+ UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName());
+ attrMap.put(Group.ID, id);
+ attrMap.put(Group.NAME, group.getName());
+ GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
+ principals.add(groupAdapter);
}
+
}
@Override
protected void onCreate()
{
super.onCreate();
- _groupManager = new FileGroupManager(getPath());
- _groupManager.onCreate();
+ _groupDatabase = new FileGroupDatabase();
+
+ File file = new File(_path);
+ if (!file.exists())
+ {
+ File parent = file.getParentFile();
+ if (!parent.exists())
+ {
+ parent.mkdirs();
+ }
+ if (parent.exists())
+ {
+ try
+ {
+ file.createNewFile();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException("Cannot create group file");
+ }
+ }
+ else
+ {
+ throw new IllegalConfigurationException("Cannot create group file");
+ }
+ }
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ setState(getState(), State.ERRORED);
+ LOGGER.warn(("Unable to open preferences file at " + _path));
+ }
+
+
}
@Override
@@ -147,12 +224,16 @@ public class FileBasedGroupProviderImpl
String groupName = (String) attributes.get(Group.NAME);
getSecurityManager().authoriseGroupOperation(Operation.CREATE, groupName);
- _groupManager.createGroup(groupName);
+
+ _groupDatabase.createGroup(groupName);
+
Map<String,Object> attrMap = new HashMap<String, Object>();
UUID id = UUIDGenerator.generateGroupUUID(getName(),groupName);
attrMap.put(Group.ID, id);
attrMap.put(Group.NAME, groupName);
- return (C) new GroupAdapter(attrMap, getTaskExecutor());
+ GroupAdapter groupAdapter = new GroupAdapter(attrMap, getTaskExecutor());
+ groupAdapter.create();
+ return (C) groupAdapter;
}
@@ -161,35 +242,25 @@ public class FileBasedGroupProviderImpl
+ childClass);
}
- @SuppressWarnings("unchecked")
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
+ private Set<Principal> getGroupPrincipals()
{
- if (clazz == Group.class)
+
+ Set<String> groups = _groupDatabase == null ? Collections.<String>emptySet() : _groupDatabase.getAllGroups();
+ if (groups.isEmpty())
{
- Set<Principal> groups = _groupManager == null ? Collections.<Principal>emptySet() : _groupManager.getGroupPrincipals();
- Collection<Group> principals = new ArrayList<Group>(groups.size());
- for (Principal group : groups)
- {
- Map<String,Object> attrMap = new HashMap<String, Object>();
- UUID id = UUIDGenerator.generateGroupUUID(getName(),group.getName());
- attrMap.put(Group.ID, id);
- attrMap.put(Group.NAME, group.getName());
- principals.add(new GroupAdapter(attrMap, getTaskExecutor()));
- }
- return (Collection<C>) Collections
- .unmodifiableCollection(principals);
+ return Collections.emptySet();
}
else
{
- return null;
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String groupName : groups)
+ {
+ principals.add(new GroupPrincipal(groupName));
+ }
+ return principals;
}
}
- public GroupManager getGroupManager()
- {
- return _groupManager;
- }
private SecurityManager getSecurityManager()
{
@@ -207,7 +278,15 @@ public class FileBasedGroupProviderImpl
{
try
{
- _groupManager.open();
+ try
+ {
+ _groupDatabase.setGroupFile(getPath());
+ }
+ catch (IOException e)
+ {
+ throw new IllegalConfigurationException("Unable to set group file " + getPath(), e);
+ }
+
return true;
}
catch(RuntimeException e)
@@ -232,7 +311,6 @@ public class FileBasedGroupProviderImpl
{
if (_state.compareAndSet(state, State.STOPPED))
{
- _groupManager.close();
return true;
}
else
@@ -245,8 +323,15 @@ public class FileBasedGroupProviderImpl
if ((state == State.INITIALISING || state == State.ACTIVE || state == State.STOPPED || state == State.QUIESCED || state == State.ERRORED)
&& _state.compareAndSet(state, State.DELETED))
{
- _groupManager.close();
- _groupManager.onDelete();
+ File file = new File(getPath());
+ if (file.exists())
+ {
+ if (!file.delete())
+ {
+ throw new IllegalConfigurationException("Cannot delete group file");
+ }
+ }
+
deleted();
return true;
}
@@ -267,7 +352,20 @@ public class FileBasedGroupProviderImpl
public Set<Principal> getGroupPrincipalsForUser(String username)
{
- return _groupManager.getGroupPrincipalsForUser(username);
+ Set<String> groups = _groupDatabase.getGroupsForUser(username);
+ if (groups.isEmpty())
+ {
+ return Collections.emptySet();
+ }
+ else
+ {
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String groupName : groups)
+ {
+ principals.add(new GroupPrincipal(groupName));
+ }
+ return principals;
+ }
}
@Override
@@ -337,6 +435,24 @@ public class FileBasedGroupProviderImpl
}
@Override
+ protected void onOpen()
+ {
+ super.onOpen();
+ Set<Principal> usersInGroup = getUserPrincipalsForGroup(getName());
+ Collection<GroupMember> members = new ArrayList<GroupMember>();
+ for (Principal principal : usersInGroup)
+ {
+ UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName());
+ Map<String,Object> attrMap = new HashMap<String, Object>();
+ attrMap.put(GroupMember.ID,id);
+ attrMap.put(GroupMember.NAME, principal.getName());
+ GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor());
+ groupMemberAdapter.open();
+ members.add(groupMemberAdapter);
+ }
+ }
+
+ @Override
protected void validateChange(final ConfiguredObject<?> proxyForValidation, final Set<String> changedAttributes)
{
super.validateChange(proxyForValidation, changedAttributes);
@@ -346,33 +462,25 @@ public class FileBasedGroupProviderImpl
}
}
- @Override
- public <C extends ConfiguredObject> Collection<C> getChildren(
- Class<C> clazz)
+ private Set<Principal> getUserPrincipalsForGroup(String group)
{
- if (clazz == GroupMember.class)
+ Set<String> users = _groupDatabase.getUsersInGroup(group);
+ if (users.isEmpty())
{
- Set<Principal> usersInGroup = _groupManager
- .getUserPrincipalsForGroup(getName());
- Collection<GroupMember> members = new ArrayList<GroupMember>();
- for (Principal principal : usersInGroup)
- {
- UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), principal.getName());
- Map<String,Object> attrMap = new HashMap<String, Object>();
- attrMap.put(GroupMember.ID,id);
- attrMap.put(GroupMember.NAME, principal.getName());
- members.add(new GroupMemberAdapter(attrMap, getTaskExecutor()));
- }
- return (Collection<C>) Collections
- .unmodifiableCollection(members);
+ return Collections.emptySet();
}
else
{
- return null;
+ Set<Principal> principals = new HashSet<Principal>();
+ for (String user : users)
+ {
+ principals.add(new UsernamePrincipal(user));
+ }
+ return principals;
}
-
}
+
@Override
public <C extends ConfiguredObject> C addChild(Class<C> childClass,
Map<String, Object> attributes,
@@ -384,12 +492,14 @@ public class FileBasedGroupProviderImpl
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, getName());
- _groupManager.addUserToGroup(memberName, getName());
+ _groupDatabase.addUserToGroup(memberName, getName());
UUID id = UUIDGenerator.generateGroupMemberUUID(FileBasedGroupProviderImpl.this.getName(), getName(), memberName);
Map<String,Object> attrMap = new HashMap<String, Object>();
attrMap.put(GroupMember.ID,id);
attrMap.put(GroupMember.NAME, memberName);
- return (C) new GroupMemberAdapter(attrMap, getTaskExecutor());
+ GroupMemberAdapter groupMemberAdapter = new GroupMemberAdapter(attrMap, getTaskExecutor());
+ groupMemberAdapter.create();
+ return (C) groupMemberAdapter;
}
@@ -405,7 +515,8 @@ public class FileBasedGroupProviderImpl
if (desiredState == State.DELETED)
{
getSecurityManager().authoriseGroupOperation(Operation.DELETE, getName());
- _groupManager.removeGroup(getName());
+ _groupDatabase.removeGroup(getName());
+ deleted();
return true;
}
@@ -479,7 +590,8 @@ public class FileBasedGroupProviderImpl
{
getSecurityManager().authoriseGroupOperation(Operation.UPDATE, GroupAdapter.this.getName());
- _groupManager.removeUserFromGroup(getName(), GroupAdapter.this.getName());
+ _groupDatabase.removeUserFromGroup(getName(), GroupAdapter.this.getName());
+ deleted();
return true;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
index 13d80bbe0e..3b55fcb1bd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java
@@ -24,12 +24,17 @@ import java.security.AccessControlException;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
-
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
+import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Publisher;
+import org.apache.qpid.server.model.Session;
+import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
@@ -59,8 +64,10 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
public void consumerRemoved(final Consumer<?> consumer)
{
childRemoved(consumer);
+
}
});
+ session.setModelObject(this);
open();
}
@@ -159,6 +166,12 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl
return _session.getUnacknowledgedMessageCount();
}
+ @Override
+ public void delete()
+ {
+ deleted();
+ }
+
@Override
protected boolean setState(State currentState, State desiredState)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
index be9240c64c..fd8e680bef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/port/PortFactory.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
@@ -133,7 +134,7 @@ public class PortFactory<X extends Port<X>> implements ConfiguredObjectTypeFacto
{
if(_configuredObjectFactory == null)
{
- _configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance());
+ _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
}
}
return _configuredObjectFactory.getConfiguredObjectTypeFactory(Port.class.getSimpleName(), type);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
index e7c14a7a4d..0df40bfff6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java
@@ -27,6 +27,7 @@ import java.util.concurrent.ConcurrentSkipListSet;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.logging.LogSubject;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.util.Deletable;
@@ -94,4 +95,8 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo
void addConsumerListener(ConsumerListener listener);
void removeConsumerListener(ConsumerListener listener);
+
+ void setModelObject(Session<?> session);
+
+ Session<?> getModelObject();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
index 8b5ea1e964..9a06569397 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueueFactory.java
@@ -29,7 +29,6 @@ import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
@@ -191,7 +190,7 @@ public class AMQQueueFactory implements QueueFactory
{
- args.put(Queue.ID, UUIDGenerator.generateQueueUUID(dlQueueName, _virtualHost.getName()));
+ args.put(Queue.ID, UUID.randomUUID());
args.put(Queue.NAME, dlQueueName);
args.put(Queue.DURABLE, true);
dlQueue = _virtualHost.createQueue(args);
@@ -269,7 +268,7 @@ public class AMQQueueFactory implements QueueFactory
private static String getDeadLetterExchangeName(String name)
{
- return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX);
+ return name + System.getProperty(BrokerProperties.PROPERTY_DEAD_LETTER_EXCHANGE_SUFFIX, VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX);
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
index fb5b5dc8bf..7738281034 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java
@@ -826,6 +826,7 @@ public abstract class AbstractQueue
// we need to manually fire the event to the removed consumer (which was the last one left for this
// queue. This is because the delete method uses the consumer set which has just been cleared
consumer.queueDeleted();
+
}
}
@@ -1645,7 +1646,7 @@ public abstract class AbstractQueue
_deleteTaskList.clear();
stop();
-
+ deleted();
//Log Queue Deletion
getEventLogger().message(_logSubject, QueueMessages.DELETED());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
index c2778085d1..a54b3ae7f0 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefaultQueueRegistry.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
@@ -33,8 +32,6 @@ public class DefaultQueueRegistry implements QueueRegistry
private ConcurrentMap<String, AMQQueue<?>> _queueMap = new ConcurrentHashMap<String, AMQQueue<?>>();
private final VirtualHostImpl _virtualHost;
- private final Collection<RegistryChangeListener> _listeners =
- new ArrayList<RegistryChangeListener>();
public DefaultQueueRegistry(VirtualHostImpl virtualHost)
{
@@ -49,28 +46,11 @@ public class DefaultQueueRegistry implements QueueRegistry
public void registerQueue(AMQQueue queue)
{
_queueMap.put(queue.getName(), queue);
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.queueRegistered(queue);
- }
- }
}
public void unregisterQueue(String name)
{
AMQQueue q = _queueMap.remove(name);
- if(q != null)
- {
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.queueUnregistered(q);
- }
- }
- }
}
@@ -84,31 +64,12 @@ public class DefaultQueueRegistry implements QueueRegistry
return queue == null ? null : _queueMap.get(queue);
}
- public void addRegistryChangeListener(RegistryChangeListener listener)
- {
- synchronized(_listeners)
- {
- _listeners.add(listener);
- }
- }
-
@Override
- public void stopAllAndUnregisterMBeans()
+ public void close()
{
for (final AMQQueue queue : getQueues())
{
queue.stop();
-
- //TODO: this is a bit of a hack, what if the listeners aren't aware
- //that we are just unregistering the MBean because of HA, and aren't
- //actually removing the queue as such.
- synchronized (_listeners)
- {
- for(RegistryChangeListener listener : _listeners)
- {
- listener.queueUnregistered(queue);
- }
- }
}
_queueMap.clear();
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
index ae1963d2a3..b3d4e4368f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java
@@ -115,7 +115,7 @@ class QueueConsumerImpl
final Class<? extends ServerMessage> messageClass,
EnumSet<Option> optionSet)
{
- super(parentsMap(queue),
+ super(parentsMap(queue, target.getSessionModel().getModelObject()),
createAttributeMap(consumerName, filters, optionSet),
queue.getVirtualHost().getTaskExecutor());
_messageClass = messageClass;
@@ -255,6 +255,7 @@ class QueueConsumerImpl
_target.close();
_target.consumerRemoved(this);
_queue.unregisterConsumer(this);
+ deleted();
}
finally
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
index 747e0d8959..5a90536476 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueRegistry.java
@@ -37,9 +37,7 @@ public interface QueueRegistry
AMQQueue<?> getQueue(String queue);
- void addRegistryChangeListener(RegistryChangeListener listener);
-
- void stopAllAndUnregisterMBeans();
+ void close();
AMQQueue<?> getQueue(UUID queueId);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
index 01281676b1..d3eea96e16 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/AbstractAuthenticationManager.java
@@ -37,6 +37,7 @@ import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.IntegrityViolationException;
import org.apache.qpid.server.model.Model;
@@ -158,7 +159,7 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
if(childClass == PreferencesProvider.class)
{
// TODO RG - get the configured object factory from parents
- ConfiguredObjectFactory factory = new ConfiguredObjectFactory(Model.getInstance());
+ ConfiguredObjectFactory factory = new ConfiguredObjectFactoryImpl(Model.getInstance());
attributes = new HashMap<String, Object>(attributes);
attributes.put(ConfiguredObject.ID, UUID.randomUUID());
final ConfiguredObjectTypeFactory preferencesFactory =
@@ -236,7 +237,6 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
{
try
{
- initialise();
if (_preferencesProvider != null)
{
_preferencesProvider.setDesiredState(_preferencesProvider.getState(), State.ACTIVE);
@@ -256,6 +256,10 @@ public abstract class AbstractAuthenticationManager<T extends AbstractAuthentica
}
}
}
+ if(state == State.ERRORED)
+ {
+ return false;
+ }
else
{
throw new IllegalStateException("Cannot activate authentication provider in state: " + state);
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
index 3e756f5210..cb3729e4e3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/PrincipalDatabaseAuthenticationManager.java
@@ -25,26 +25,35 @@ import java.io.FileNotFoundException;
import java.io.IOException;
import java.security.AccessControlException;
import java.security.Principal;
-import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
import javax.security.auth.login.AccountNotFoundException;
import javax.security.sasl.SaslException;
import javax.security.sasl.SaslServer;
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.AbstractConfiguredObject;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ExternalFileBasedAuthenticationManager;
+import org.apache.qpid.server.model.IllegalStateTransitionException;
+import org.apache.qpid.server.model.ManagedAttributeField;
+import org.apache.qpid.server.model.PreferencesProvider;
+import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.User;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.security.auth.AuthenticationResult;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.AuthenticationResult.AuthenticationStatus;
+import org.apache.qpid.server.security.auth.UsernamePrincipal;
import org.apache.qpid.server.security.auth.database.PrincipalDatabase;
public abstract class PrincipalDatabaseAuthenticationManager<T extends PrincipalDatabaseAuthenticationManager<T>>
@@ -55,6 +64,8 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
private static final Logger LOGGER = Logger.getLogger(PrincipalDatabaseAuthenticationManager.class);
+ private final Map<Principal, PrincipalAdapter> _userMap = new ConcurrentHashMap<Principal, PrincipalAdapter>();
+
private PrincipalDatabase _principalDatabase;
@ManagedAttributeField
private String _path;
@@ -92,6 +103,21 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
{
super.onOpen();
_principalDatabase = createDatabase();
+ try
+ {
+ initialise();
+ List<Principal> users =
+ _principalDatabase == null ? Collections.<Principal>emptyList() : _principalDatabase.getUsers();
+ for (Principal user : users)
+ {
+ _userMap.put(user, new PrincipalAdapter(user));
+ }
+ }
+ catch(IllegalConfigurationException e)
+ {
+ updateState(getState(), State.ERRORED);
+
+ }
}
protected abstract PrincipalDatabase createDatabase();
@@ -202,16 +228,41 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
public boolean createUser(String username, String password, Map<String, String> attributes)
{
getSecurityManager().authoriseUserOperation(Operation.CREATE, username);
- return getPrincipalDatabase().createPrincipal(new UsernamePrincipal(username), password.toCharArray());
+ Principal principal = new UsernamePrincipal(username);
+ boolean created =
+ getPrincipalDatabase().createPrincipal(principal, password.toCharArray());
+ if(created)
+ {
+ principal = getPrincipalDatabase().getUser(username);
+
+ _userMap.put(principal, new PrincipalAdapter(principal));
+ }
+ return created;
}
- @Override
- public void deleteUser(String username) throws AccountNotFoundException
+
+ private void deleteUserFromDatabase(String username) throws AccountNotFoundException
{
getSecurityManager().authoriseUserOperation(Operation.DELETE, username);
- getPrincipalDatabase().deletePrincipal(new UsernamePrincipal(username));
+ UsernamePrincipal principal = new UsernamePrincipal(username);
+ getPrincipalDatabase().deletePrincipal(principal);
+ _userMap.remove(principal);
+ }
+ @Override
+ public void deleteUser(String username) throws AccountNotFoundException
+ {
+ UsernamePrincipal principal = new UsernamePrincipal(username);
+ PrincipalAdapter user = _userMap.get(principal);
+ if(user != null)
+ {
+ user.setState(user.getState(), State.DELETED);
+ }
+ else
+ {
+ deleteUserFromDatabase(username);
+ }
}
private org.apache.qpid.server.security.SecurityManager getSecurityManager()
@@ -258,9 +309,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
if(createUser(username, password,null))
{
- @SuppressWarnings("unchecked")
- C principalAdapter = (C) new PrincipalAdapter(p);
- return principalAdapter;
+ return (C) _userMap.get(p);
}
else
{
@@ -276,23 +325,7 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == User.class)
- {
- PrincipalDatabase principalDatabase = getPrincipalDatabase();
- List<Principal> users = principalDatabase == null ? Collections.<Principal>emptyList() : principalDatabase.getUsers();
- Collection<User> principals = new ArrayList<User>(users.size());
- for(Principal user : users)
- {
- principals.add(new PrincipalAdapter(user));
- }
- @SuppressWarnings("unchecked")
- Collection<C> unmodifiablePrincipals = (Collection<C>) Collections.unmodifiableCollection(principals);
- return unmodifiablePrincipals;
- }
- else
- {
- return super.getChildren(clazz);
- }
+ return super.getChildren(clazz);
}
@Override
@@ -426,12 +459,13 @@ public abstract class PrincipalDatabaseAuthenticationManager<T extends Principal
try
{
String userName = _user.getName();
- deleteUser(userName);
+ deleteUserFromDatabase(userName);
PreferencesProvider preferencesProvider = getPreferencesProvider();
if (preferencesProvider != null)
{
preferencesProvider.deletePreferences(userName);
}
+ deleted();
}
catch (AccountNotFoundException e)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
index f9d25e3ec0..5628d49949 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/auth/manager/ScramSHA1AuthenticationManager.java
@@ -478,8 +478,8 @@ public class ScramSHA1AuthenticationManager
{
_authenticationManager.getSecurityManager().authoriseUserOperation(Operation.DELETE, getName());
_authenticationManager._users.remove(getName());
- _authenticationManager.unregisterChild(this);
- _authenticationManager.childRemoved(this);
+ _authenticationManager.deleted();
+ deleted();
return true;
}
else
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java
deleted file mode 100644
index e11a4f83db..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManager.java
+++ /dev/null
@@ -1,239 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.security.group;
-
-import java.io.File;
-import java.io.IOException;
-import java.security.Principal;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.Set;
-
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-
-/**
- * Implementation of a group manager whose implementation is backed by a flat group file.
- * <p>
- * This plugin is configured in the following manner:
- * </p>
- * <pre>
- * "groupproviders":[
- * ...
- * {
- * "name" : "...",
- * "type" : "GroupFile",
- * "path" : "path/to/file/with/groups",
- * }
- * ...
- * ]
- * </pre>
- */
-public class FileGroupManager implements GroupManager
-{
- private final FileGroupDatabase _groupDatabase;
- private final String _groupFile;
-
- public FileGroupManager(String groupFile)
- {
- _groupFile = groupFile;
- _groupDatabase = new FileGroupDatabase();
- }
-
- @Override
- public Set<Principal> getGroupPrincipalsForUser(String userId)
- {
- Set<String> groups = _groupDatabase.getGroupsForUser(userId);
- if (groups.isEmpty())
- {
- return Collections.emptySet();
- }
- else
- {
- Set<Principal> principals = new HashSet<Principal>();
- for (String groupName : groups)
- {
- principals.add(new GroupPrincipal(groupName));
- }
- return principals;
- }
- }
-
- @Override
- public Set<Principal> getUserPrincipalsForGroup(String group)
- {
- Set<String> users = _groupDatabase.getUsersInGroup(group);
- if (users.isEmpty())
- {
- return Collections.emptySet();
- }
- else
- {
- Set<Principal> principals = new HashSet<Principal>();
- for (String user : users)
- {
- principals.add(new UsernamePrincipal(user));
- }
- return principals;
- }
- }
-
- @Override
- public Set<Principal> getGroupPrincipals()
- {
- Set<String> groups = _groupDatabase.getAllGroups();
- if (groups.isEmpty())
- {
- return Collections.emptySet();
- }
- else
- {
- Set<Principal> principals = new HashSet<Principal>();
- for (String groupName : groups)
- {
- principals.add(new GroupPrincipal(groupName));
- }
- return principals;
- }
- }
-
- @Override
- public void createGroup(String group)
- {
- _groupDatabase.createGroup(group);
- }
-
- @Override
- public void removeGroup(String group)
- {
- _groupDatabase.removeGroup(group);
- }
-
- @Override
- public void addUserToGroup(String user, String group)
- {
- _groupDatabase.addUserToGroup(user, group);
- }
-
- @Override
- public void removeUserFromGroup(String user, String group)
- {
- _groupDatabase.removeUserFromGroup(user, group);
-
- }
-
- @Override
- public void onDelete()
- {
- File file = new File(_groupFile);
- if (file.exists())
- {
- if (!file.delete())
- {
- throw new IllegalConfigurationException("Cannot delete group file");
- }
- }
- }
-
- @Override
- public void onCreate()
- {
- File file = new File(_groupFile);
- if (!file.exists())
- {
- File parent = file.getParentFile();
- if (!parent.exists())
- {
- parent.mkdirs();
- }
- if (parent.exists())
- {
- try
- {
- file.createNewFile();
- }
- catch (IOException e)
- {
- throw new IllegalConfigurationException("Cannot create group file");
- }
- }
- else
- {
- throw new IllegalConfigurationException("Cannot create group file");
- }
- }
- }
-
- @Override
- public void open()
- {
- try
- {
- _groupDatabase.setGroupFile(_groupFile);
- }
- catch (IOException e)
- {
- throw new IllegalConfigurationException("Unable to set group file " + _groupFile, e);
- }
- }
-
- @Override
- public void close()
- {
- // no-op
- }
-
- @Override
- public int hashCode()
- {
- return ((_groupFile == null) ? 0 : _groupFile.hashCode());
- }
-
- @Override
- public boolean equals(Object obj)
- {
- if (this == obj)
- {
- return true;
- }
- if (obj == null)
- {
- return false;
- }
- if (getClass() != obj.getClass())
- {
- return false;
- }
- FileGroupManager other = (FileGroupManager) obj;
- if (_groupFile == null)
- {
- if (other._groupFile != null)
- {
- return false;
- }
- else
- {
- return true;
- }
- }
- return _groupFile.equals(other._groupFile);
- }
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java
deleted file mode 100644
index 50f08623cd..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/group/FileGroupManagerFactory.java
+++ /dev/null
@@ -1,79 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.security.group;
-
-import static org.apache.qpid.server.util.MapValueConverter.getStringAttribute;
-
-import java.util.Arrays;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.Map;
-
-import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.plugin.GroupManagerFactory;
-import org.apache.qpid.server.util.ResourceBundleLoader;
-
-public class FileGroupManagerFactory implements GroupManagerFactory
-{
- public static final String RESOURCE_BUNDLE = "org.apache.qpid.server.security.group.FileGroupProviderAttributeDescriptions";
-
- public static final String GROUP_FILE_PROVIDER_TYPE = "GroupFile";
- public static final String PATH = "path";
-
- public static final Collection<String> ATTRIBUTES = Collections.<String> unmodifiableList(Arrays.asList(
- ATTRIBUTE_TYPE,
- PATH
- ));
-
- @Override
- public GroupManager createInstance(Map<String, Object> attributes)
- {
- if(attributes == null || !GROUP_FILE_PROVIDER_TYPE.equals(attributes.get(ATTRIBUTE_TYPE)))
- {
- return null;
- }
-
- String groupFile = getStringAttribute(PATH, attributes, null);
- if (groupFile == null || "".equals(groupFile.trim()))
- {
- throw new IllegalConfigurationException("Path to file containing groups is not specified!");
- }
-
- return new FileGroupManager(groupFile);
- }
-
- @Override
- public String getType()
- {
- return GROUP_FILE_PROVIDER_TYPE;
- }
-
- @Override
- public Collection<String> getAttributeNames()
- {
- return ATTRIBUTES;
- }
-
- @Override
- public Map<String, String> getAttributeDescriptions()
- {
- return ResourceBundleLoader.getResources(RESOURCE_BUNDLE);
- }
-
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index c28c440a87..c49f626242 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -42,15 +42,13 @@ import javax.security.auth.Subject;
import org.apache.log4j.Logger;
+import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.connection.ConnectionRegistry;
import org.apache.qpid.server.connection.IConnectionRegistry;
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
-import org.apache.qpid.server.exchange.DefaultExchangeRegistry;
-import org.apache.qpid.server.exchange.ExchangeFactory;
+import org.apache.qpid.server.exchange.DefaultDestination;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ConfigStoreMessages;
import org.apache.qpid.server.logging.messages.MessageStoreMessages;
@@ -64,7 +62,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.*;
import org.apache.qpid.server.model.adapter.ConnectionAdapter;
import org.apache.qpid.server.model.adapter.VirtualHostAliasAdapter;
-import org.apache.qpid.server.plugin.ExchangeType;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.QpidServiceLoader;
import org.apache.qpid.server.plugin.SystemNodeCreator;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -78,6 +76,7 @@ import org.apache.qpid.server.queue.QueueRegistry;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.stats.StatisticsCounter;
+import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfigurationStoreHelper;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
@@ -85,6 +84,7 @@ import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
import org.apache.qpid.server.txn.LocalTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
@@ -106,10 +106,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
private final QueueRegistry _queueRegistry;
- private final ExchangeRegistry _exchangeRegistry;
-
- private final ExchangeFactory _exchangeFactory;
-
private final ConnectionRegistry _connectionRegistry;
private final DtxRegistry _dtxRegistry;
@@ -166,6 +162,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@ManagedAttributeField
private String _securityAcl;
+ private MessageDestination _defaultDestination;
public AbstractVirtualHost(final Map<String, Object> attributes, Broker<?> broker)
@@ -186,9 +183,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
_queueFactory = new AMQQueueFactory(this, _queueRegistry);
- _exchangeFactory = new DefaultExchangeFactory(this);
-
- _exchangeRegistry = new DefaultExchangeRegistry(this, _queueRegistry);
+ _defaultDestination = new DefaultDestination(this);
}
@@ -294,6 +289,40 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
abstract protected void initialiseStorage(org.apache.qpid.server.model.VirtualHost<?,?,?> virtualHost);
+ protected boolean isStoreEmpty()
+ {
+ final IsStoreEmptyHandler isStoreEmptyHandler = new IsStoreEmptyHandler();
+
+ getDurableConfigurationStore().visitConfiguredObjectRecords(isStoreEmptyHandler);
+
+ return isStoreEmptyHandler.isEmpty();
+ }
+
+ protected void createDefaultExchanges()
+ {
+ Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Void>()
+ {
+ @Override
+ public Void run()
+ {
+ addStandardExchange(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+ addStandardExchange(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS);
+ addStandardExchange(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+ addStandardExchange(ExchangeDefaults.FANOUT_EXCHANGE_NAME, ExchangeDefaults.FANOUT_EXCHANGE_CLASS);
+ return null;
+ }
+
+ void addStandardExchange(String name, String type)
+ {
+ Map<String, Object> attributes = new HashMap<String, Object>();
+ attributes.put(Exchange.NAME, name);
+ attributes.put(Exchange.TYPE, type);
+ attributes.put(Exchange.ID, UUIDGenerator.generateExchangeUUID(name, getName()));
+ childAdded(addExchange(attributes));
+ }
+ });
+ }
+
abstract protected MessageStoreLogSubject getMessageStoreLogSubject();
public IConnectionRegistry getConnectionRegistry()
@@ -387,11 +416,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public <C extends ConfiguredObject> Collection<C> getChildren(Class<C> clazz)
{
- if(clazz == Exchange.class)
- {
- return (Collection<C>) getExchanges();
- }
- else if(clazz == Queue.class)
+ if(clazz == Queue.class)
{
return (Collection<C>) getQueues();
}
@@ -405,12 +430,12 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
else
{
- return Collections.emptySet();
+ return super.getChildren(clazz);
}
}
@Override
- public <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
+ protected <C extends ConfiguredObject> C addChild(Class<C> childClass, Map<String, Object> attributes, ConfiguredObject... otherParents)
{
checkVHostStateIsActive();
if(childClass == Exchange.class)
@@ -436,13 +461,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
public Collection<String> getExchangeTypeNames()
{
- Collection<String> exchangeTypes = new ArrayList<String>();
-
- for(ExchangeType<? extends ExchangeImpl> type : getExchangeTypes())
- {
- exchangeTypes.add(type.getType());
- }
- return Collections.unmodifiableCollection(exchangeTypes);
+ return getObjectFactory().getSupportedTypes(Exchange.class);
}
@@ -540,20 +559,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
- protected void initialiseModel()
- {
- Subject.doAs(getSecurityManager().getSubjectWithAddedSystemRights(), new PrivilegedAction<Object>()
- {
- @Override
- public Object run()
- {
- _exchangeRegistry.initialise(_exchangeFactory);
- return null;
- }
- });
- }
-
-
public long getCreateTime()
{
return _createTime;
@@ -564,63 +569,6 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _queueRegistry;
}
- protected ExchangeRegistry getExchangeRegistry()
- {
- return _exchangeRegistry;
- }
-
- protected ExchangeFactory getExchangeFactory()
- {
- return _exchangeFactory;
- }
-
- @Override
- public void addVirtualHostListener(final VirtualHostListener listener)
- {
- _exchangeRegistry.addRegistryChangeListener(new ExchangeRegistry.RegistryChangeListener()
- {
- @Override
- public void exchangeRegistered(ExchangeImpl exchange)
- {
- listener.exchangeRegistered(exchange);
- }
-
- @Override
- public void exchangeUnregistered(ExchangeImpl exchange)
- {
- listener.exchangeUnregistered(exchange);
- }
- });
- _queueRegistry.addRegistryChangeListener(new QueueRegistry.RegistryChangeListener()
- {
- @Override
- public void queueRegistered(AMQQueue queue)
- {
- listener.queueRegistered(queue);
- }
-
- @Override
- public void queueUnregistered(AMQQueue queue)
- {
- listener.queueUnregistered(queue);
- }
- });
- _connectionRegistry.addRegistryChangeListener(new IConnectionRegistry.RegistryChangeListener()
- {
- @Override
- public void connectionRegistered(AMQConnectionModel connection)
- {
- listener.connectionRegistered(connection);
- }
-
- @Override
- public void connectionUnregistered(AMQConnectionModel connection)
- {
- listener.connectionUnregistered(connection);
- }
- });
- }
-
@Override
public AMQQueue<?> getQueue(String name)
{
@@ -716,7 +664,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
if(!attributes.containsKey(Queue.ID))
{
- UUID id = UUIDGenerator.generateQueueUUID(queueName, getName());
+ UUID id = UUID.randomUUID();
while(_queueRegistry.getQueue(id) != null)
{
id = UUID.randomUUID();
@@ -749,31 +697,26 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public ExchangeImpl getExchange(String name)
{
- return _exchangeRegistry.getExchange(name);
+ return getChildByName(ExchangeImpl.class,name);
}
@Override
public ExchangeImpl getExchange(UUID id)
{
- return _exchangeRegistry.getExchange(id);
+ return getChildById(ExchangeImpl.class, id);
}
@Override
public MessageDestination getDefaultDestination()
{
- return _exchangeRegistry.getDefaultExchange();
+ return _defaultDestination;
}
@Override
public Collection<ExchangeImpl<?>> getExchanges()
{
- return Collections.unmodifiableCollection(_exchangeRegistry.getExchanges());
- }
-
- @Override
- public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes()
- {
- return _exchangeFactory.getRegisteredTypes();
+ Collection children = getChildren(Exchange.class);
+ return children;
}
public ExchangeImpl<?> createExchange(final String name,
@@ -889,69 +832,27 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
throws ExchangeExistsException, ReservedExchangeNameException,
UnknownExchangeException, AMQUnknownExchangeType
{
- String name = MapValueConverter.getStringAttribute(org.apache.qpid.server.model.Exchange.NAME, attributes);
- if(attributes.get(Exchange.DURABLE) == null)
- {
- attributes = new HashMap<String, Object>(attributes);
- attributes.put(Exchange.DURABLE, false);
- }
- boolean durable =
- MapValueConverter.getBooleanAttribute(Exchange.DURABLE, attributes);
+ Broker<?> broker = getParent(Broker.class);
+ ConfiguredObjectTypeFactory<? extends Exchange> factory =
+ broker.getObjectFactory().getConfiguredObjectTypeFactory(Exchange.class, attributes);
- synchronized (_exchangeRegistry)
+ try
{
- ExchangeImpl existing;
- if((existing = _exchangeRegistry.getExchange(name)) !=null)
- {
- throw new ExchangeExistsException(name,existing);
- }
- if(_exchangeRegistry.isReservedExchangeName(name))
- {
- throw new ReservedExchangeNameException(name);
- }
-
-
- if(attributes.get(org.apache.qpid.server.model.Exchange.ID) == null)
- {
- attributes = new LinkedHashMap<String, Object>(attributes);
- attributes.put(org.apache.qpid.server.model.Exchange.ID,
- UUIDGenerator.generateExchangeUUID(name, getName()));
- }
-
- ExchangeImpl exchange = _exchangeFactory.createExchange(attributes);
-
- _exchangeRegistry.registerExchange(exchange);
- if(durable)
- {
- DurableConfigurationStoreHelper.createExchange(getDurableConfigurationStore(), exchange);
- }
- return exchange;
+ return (ExchangeImpl) factory.create(attributes, this);
+ }
+ catch (DuplicateNameException e)
+ {
+ throw new ExchangeExistsException(getExchange(e.getName()));
}
+
}
@Override
public void removeExchange(ExchangeImpl exchange, boolean force)
throws ExchangeIsAlternateException, RequiredExchangeException
{
- if(exchange.hasReferrers())
- {
- throw new ExchangeIsAlternateException(exchange.getName());
- }
-
- for(ExchangeType type : getExchangeTypes())
- {
- if(type.getDefaultExchangeName().equals( exchange.getName() ))
- {
- throw new RequiredExchangeException(exchange.getName());
- }
- }
- _exchangeRegistry.unregisterExchange(exchange.getName(), !force);
- if (exchange.isDurable() && !exchange.isAutoDelete())
- {
- DurableConfigurationStoreHelper.removeExchange(getDurableConfigurationStore(), exchange);
- }
-
+ exchange.delete();
}
public SecurityManager getSecurityManager()
@@ -959,18 +860,21 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return _broker.getSecurityManager();
}
+ @Override
+ public ConfiguredObjectFactory getObjectFactory()
+ {
+ return _broker.getObjectFactory();
+ }
+
public void close()
{
//Stop Connections
_connectionRegistry.close();
- _queueRegistry.stopAllAndUnregisterMBeans();
+ _queueRegistry.close();
_dtxRegistry.close();
closeStorage();
shutdownHouseKeeping();
- // clear exchange objects
- _exchangeRegistry.clearAndUnregisterMbeans();
-
_state = VirtualHostState.STOPPED;
_eventLogger.message(VirtualHostMessages.CLOSED(getName()));
@@ -1220,9 +1124,9 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
protected Map<String, DurableConfiguredObjectRecoverer> getDurableConfigurationRecoverers()
{
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(this, getExchangeRegistry(), _queueFactory),
- new ExchangeRecoverer(getExchangeRegistry(), getExchangeFactory()),
- new BindingRecoverer(this, getExchangeRegistry())
+ new QueueRecoverer(this, _queueFactory),
+ new ExchangeRecoverer(this),
+ new BindingRecoverer(this)
};
final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
@@ -1233,6 +1137,35 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
return recovererMap;
}
+ private static class IsStoreEmptyHandler implements ConfiguredObjectRecordHandler
+ {
+ private boolean _empty = true;
+
+ @Override
+ public void begin()
+ {
+ }
+
+ @Override
+ public boolean handle(final ConfiguredObjectRecord record)
+ {
+ // if there is a non vhost record then the store is not empty and we can stop looking at the records
+ _empty = record.getType().equals(VirtualHost.class.getSimpleName());
+ return _empty;
+ }
+
+ @Override
+ public void end()
+ {
+
+ }
+
+ public boolean isEmpty()
+ {
+ return _empty;
+ }
+ }
+
private class VirtualHostHouseKeepingTask extends HouseKeepingTask
{
public VirtualHostHouseKeepingTask()
@@ -1425,12 +1358,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
}
else if(SUPPORTED_EXCHANGE_TYPES.equals(name))
{
- List<String> types = new ArrayList<String>();
- for(ExchangeType<?> type : getExchangeTypes())
- {
- types.add(type.getType());
- }
- return Collections.unmodifiableCollection(types);
+ return getObjectFactory().getSupportedTypes(Exchange.class);
}
else if(SUPPORTED_QUEUE_TYPES.equals(name))
{
@@ -1443,12 +1371,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
@Override
public Collection<String> getSupportedExchangeTypes()
{
- List<String> types = new ArrayList<String>();
- for(ExchangeType<?> type : getExchangeTypes())
- {
- types.add(type.getType());
- }
- return Collections.unmodifiableCollection(types);
+ return getObjectFactory().getSupportedTypes(Exchange.class);
}
@Override
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
index 11623bd36b..a976db05f6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/BindingRecoverer.java
@@ -24,10 +24,11 @@ import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.binding.BindingImpl;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
@@ -40,13 +41,10 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
{
private static final Logger _logger = Logger.getLogger(BindingRecoverer.class);
- private final ExchangeRegistry _exchangeRegistry;
private final VirtualHostImpl _virtualHost;
- public BindingRecoverer(final VirtualHostImpl virtualHost,
- final ExchangeRegistry exchangeRegistry)
+ public BindingRecoverer(final VirtualHostImpl virtualHost)
{
- _exchangeRegistry = exchangeRegistry;
_virtualHost = virtualHost;
}
@@ -81,7 +79,7 @@ public class BindingRecoverer extends AbstractDurableConfiguredObjectRecoverer<B
_bindingId = record.getId();
_exchangeId = record.getParents().get(Exchange.class.getSimpleName()).getId();
_queueId = record.getParents().get(Queue.class.getSimpleName()).getId();
- _exchange = _exchangeRegistry.getExchange(_exchangeId);
+ _exchange = _virtualHost.getExchange(_exchangeId);
if(_exchange == null)
{
_unresolvedDependencies.add(new ExchangeDependency());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
index 695242726d..109bf85aef 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeExistsException.java
@@ -25,9 +25,9 @@ public class ExchangeExistsException extends RuntimeException
{
private final ExchangeImpl _existing;
- public ExchangeExistsException(String name, ExchangeImpl existing)
+ public ExchangeExistsException(ExchangeImpl existing)
{
- super(name);
+ super(existing.getName());
_existing = existing;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
index 4674a4a534..3e70cd865a 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeIsAlternateException.java
@@ -20,7 +20,7 @@
*/
package org.apache.qpid.server.virtualhost;
-public class ExchangeIsAlternateException extends Exception
+public class ExchangeIsAlternateException extends RuntimeException
{
public ExchangeIsAlternateException(String name)
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
index 4431fc786d..4bf7635513 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/ExchangeRecoverer.java
@@ -22,26 +22,32 @@ package org.apache.qpid.server.virtualhost;
import java.util.HashMap;
import java.util.Map;
-import java.util.UUID;
+
import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.store.AbstractDurableConfiguredObjectRecoverer;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.store.UnresolvedDependency;
import org.apache.qpid.server.store.UnresolvedObject;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<ExchangeImpl>
{
- private final ExchangeRegistry _exchangeRegistry;
- private final ExchangeFactory _exchangeFactory;
+ private final VirtualHostImpl<?,?,?> _vhost;
+ private final ConfiguredObjectFactory _objectFactory;
- public ExchangeRecoverer(final ExchangeRegistry exchangeRegistry, final ExchangeFactory exchangeFactory)
+ public ExchangeRecoverer(final VirtualHostImpl vhost)
{
- _exchangeRegistry = exchangeRegistry;
- _exchangeFactory = exchangeFactory;
+ _vhost = vhost;
+ Broker<?> broker = _vhost.getParent(Broker.class);
+ SystemContext<?> systemContext = broker.getParent(SystemContext.class);
+ _objectFactory = systemContext.getObjectFactory();
}
@Override
@@ -53,31 +59,36 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
@Override
public UnresolvedObject<ExchangeImpl> createUnresolvedObject(final ConfiguredObjectRecord record)
{
- return new UnresolvedExchange(record.getId(), record.getAttributes());
+ return new UnresolvedExchange(record);
}
private class UnresolvedExchange implements UnresolvedObject<ExchangeImpl>
{
private ExchangeImpl<?> _exchange;
- public UnresolvedExchange(final UUID id,
- final Map<String, Object> attributeMap)
+ public UnresolvedExchange(ConfiguredObjectRecord record)
{
+ Map<String,Object> attributeMap = record.getAttributes();
String exchangeName = (String) attributeMap.get(org.apache.qpid.server.model.Exchange.NAME);
try
{
- _exchange = _exchangeRegistry.getExchange(id);
+ _exchange = _vhost.getExchange(record.getId());
if(_exchange == null)
{
- _exchange = _exchangeRegistry.getExchange(exchangeName);
+ _exchange = _vhost.getExchange(exchangeName);
}
if (_exchange == null)
{
Map<String,Object> attributesWithId = new HashMap<String,Object>(attributeMap);
- attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,id);
+ attributesWithId.put(org.apache.qpid.server.model.Exchange.ID,record.getId());
attributesWithId.put(org.apache.qpid.server.model.Exchange.DURABLE,true);
- _exchange = _exchangeFactory.restoreExchange(attributesWithId);
- _exchangeRegistry.registerExchange(_exchange);
+
+ ConfiguredObjectTypeFactory<? extends Exchange> configuredObjectTypeFactory =
+ _objectFactory.getConfiguredObjectTypeFactory(Exchange.class, attributesWithId);
+ UnresolvedConfiguredObject<? extends Exchange> unresolvedConfiguredObject =
+ configuredObjectTypeFactory.recover(record, _vhost);
+ _exchange = (ExchangeImpl<?>) unresolvedConfiguredObject.resolve();
+
}
}
catch (AMQUnknownExchangeType e)
@@ -101,6 +112,7 @@ public class ExchangeRecoverer extends AbstractDurableConfiguredObjectRecoverer<
@Override
public ExchangeImpl resolve()
{
+ _exchange.open();
return _exchange;
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
index d7fc08f249..788ae8ac9f 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/QueueRecoverer.java
@@ -20,14 +20,15 @@
*/
package org.apache.qpid.server.virtualhost;
-import java.util.LinkedHashMap;
import java.util.ArrayList;
+import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
+
import org.apache.log4j.Logger;
+
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueFactory;
@@ -40,15 +41,12 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
{
private static final Logger _logger = Logger.getLogger(QueueRecoverer.class);
private final VirtualHostImpl _virtualHost;
- private final ExchangeRegistry _exchangeRegistry;
private final QueueFactory _queueFactory;
public QueueRecoverer(final VirtualHostImpl virtualHost,
- final ExchangeRegistry exchangeRegistry,
final QueueFactory queueFactory)
{
_virtualHost = virtualHost;
- _exchangeRegistry = exchangeRegistry;
_queueFactory = queueFactory;
}
@@ -82,7 +80,7 @@ public class QueueRecoverer extends AbstractDurableConfiguredObjectRecoverer<AMQ
_id = id;
if (_alternateExchangeId != null)
{
- _alternateExchange = _exchangeRegistry.getExchange(_alternateExchangeId);
+ _alternateExchange = _virtualHost.getExchange(_alternateExchangeId);
if(_alternateExchange == null)
{
_dependencies.add(new AlternateExchangeDependency());
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
index 5073c558da..35824233c9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/RequiredExchangeException.java
@@ -20,10 +20,10 @@
*/
package org.apache.qpid.server.virtualhost;
-public class RequiredExchangeException extends Exception
+public class RequiredExchangeException extends RuntimeException
{
public RequiredExchangeException(String name)
{
- super(name);
+ super("'" + name + "' is a reserved exchange and can't be deleted");
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 6f75a67197..e2454b0d18 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -126,18 +126,21 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost
getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.CREATED());
Map<String, Object> configurationStoreSettings = virtualHost.getConfigurationStoreSettings();
- String configurationStoreType = configurationStoreSettings == null ? null : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
+ String configurationStoreType = configurationStoreSettings == null
+ ? null
+ : (String) configurationStoreSettings.get(DurableConfigurationStore.STORE_TYPE);
_durableConfigurationStore = initialiseConfigurationStore(configurationStoreType);
boolean combinedStores = _durableConfigurationStore == _messageStore;
if (combinedStores)
{
- configurationStoreSettings = new HashMap<String,Object>(messageStoreSettings);
+ configurationStoreSettings = new HashMap<String, Object>(messageStoreSettings);
configurationStoreSettings.put(DurableConfigurationStore.IS_MESSAGE_STORE_TOO, true);
}
if (!combinedStores)
{
- _configurationStoreLogSubject = new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
+ _configurationStoreLogSubject =
+ new MessageStoreLogSubject(getName(), _durableConfigurationStore.getClass().getSimpleName());
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.CREATED());
}
@@ -145,26 +148,33 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost
_messageStore.openMessageStore(virtualHost, virtualHost.getMessageStoreSettings());
- getEventLogger().message(_messageStoreLogSubject, MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
+ getEventLogger().message(_messageStoreLogSubject,
+ MessageStoreMessages.STORE_LOCATION(_messageStore.getStoreLocation()));
if (_configurationStoreLogSubject != null)
{
- getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
+ getEventLogger().message(_configurationStoreLogSubject,
+ ConfigStoreMessages.STORE_LOCATION(configurationStoreSettings.toString()));
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_START());
}
- ConfiguredObjectRecordHandler upgraderRecoverer = new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
- _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
+ if (isStoreEmpty())
+ {
+ createDefaultExchanges();
+ }
+ else
+ {
+ ConfiguredObjectRecordHandler upgraderRecoverer =
+ new ConfiguredObjectRecordRecoveverAndUpgrader(this, getDurableConfigurationRecoverers());
+ _durableConfigurationStore.visitConfiguredObjectRecords(upgraderRecoverer);
+ }
if (_configurationStoreLogSubject != null)
{
getEventLogger().message(_configurationStoreLogSubject, ConfigStoreMessages.RECOVERY_COMPLETE());
}
- // If store does not have entries for standard exchanges (amq.*), the following will create them.
- initialiseModel();
-
new MessageStoreRecoverer(this, getMessageStoreLogSubject()).recover();
attainActivation();
@@ -193,4 +203,5 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost
{
return _configurationStoreLogSubject;
}
+
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
index 9a1390d2e8..91b1a9e408 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostImpl.java
@@ -34,8 +34,8 @@ import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.EventLoggerProvider;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.VirtualHost;
-import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
@@ -50,6 +50,8 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
EventLoggerProvider,
VirtualHost<X,Q,E>
{
+ String DEFAULT_DLE_NAME_SUFFIX = "_DLE";
+
IConnectionRegistry getConnectionRegistry();
String getName();
@@ -82,15 +84,13 @@ public interface VirtualHostImpl< X extends VirtualHostImpl<X,Q,E>, Q extends AM
Collection<E> getExchanges();
- Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes();
-
DurableConfigurationStore getDurableConfigurationStore();
MessageStore getMessageStore();
SecurityManager getSecurityManager();
- void addVirtualHostListener(VirtualHostListener listener);
+ ConfiguredObjectFactory getObjectFactory();
void close();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
deleted file mode 100644
index af8b0c8f29..0000000000
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostListener.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.virtualhost;
-
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.protocol.AMQConnectionModel;
-import org.apache.qpid.server.queue.AMQQueue;
-
-public interface VirtualHostListener
-{
-
- public void queueRegistered(AMQQueue queue);
-
- public void queueUnregistered(AMQQueue queue);
-
- public void connectionRegistered(AMQConnectionModel connection);
-
- public void connectionUnregistered(AMQConnectionModel connection);
-
- public void exchangeRegistered(ExchangeImpl exchange);
-
- public void exchangeUnregistered(ExchangeImpl exchange);
-}
diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
index c2f1353671..a5dfec8a29 100644
--- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
+++ b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory
@@ -35,6 +35,10 @@ org.apache.qpid.server.model.adapter.BrokerAdapterFactory
org.apache.qpid.server.model.adapter.StandardVirtualHostFactory
org.apache.qpid.server.model.adapter.FileBasedGroupProviderFactory
org.apache.qpid.server.model.adapter.FileSystemPreferencesProviderFactory
+org.apache.qpid.server.exchange.DirectExchangeFactory
+org.apache.qpid.server.exchange.FanoutExchangeFactory
+org.apache.qpid.server.exchange.HeadersExchangeFactory
+org.apache.qpid.server.exchange.TopicExchangeFactory
diff --git a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory b/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
deleted file mode 100644
index 6bfb55ff18..0000000000
--- a/qpid/java/broker-core/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.GroupManagerFactory
+++ /dev/null
@@ -1,19 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-org.apache.qpid.server.security.group.FileGroupManagerFactory
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java
index 8df5576715..43b1ed508a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/BrokerConfigurationStoreCreatorTest.java
@@ -40,7 +40,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
@@ -53,6 +53,7 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase
private File _userStoreLocation;
private BrokerConfigurationStoreCreator _storeCreator;
private SystemContext _systemContext;
+ private TaskExecutor _taskExecutor;
public void setUp() throws Exception
{
@@ -69,8 +70,10 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase
_userStoreLocation = new File(TMP_FOLDER, "_store_" + System.currentTimeMillis() + "_" + getTestName());
final BrokerOptions brokerOptions = mock(BrokerOptions.class);
when(brokerOptions.getConfigurationStoreLocation()).thenReturn(_userStoreLocation.getAbsolutePath());
- _systemContext = new SystemContextImpl(new TaskExecutor(),
- new ConfiguredObjectFactory(Model.getInstance()),
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ _systemContext = new SystemContextImpl(_taskExecutor,
+ new ConfiguredObjectFactoryImpl(Model.getInstance()),
mock(EventLogger.class),
mock(LogRecorder.class),
brokerOptions);
@@ -81,6 +84,7 @@ public class BrokerConfigurationStoreCreatorTest extends QpidTestCase
try
{
super.tearDown();
+ _taskExecutor.stop();
}
finally
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
index 04e41aa584..2a718b5c15 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/BrokerRecovererTest.java
@@ -43,6 +43,7 @@ import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Port;
@@ -66,14 +67,17 @@ public class BrokerRecovererTest extends TestCase
private UUID _authenticationProvider1Id = UUID.randomUUID();
private SystemContext _systemContext;
private ConfiguredObjectFactory _configuredObjectFactory;
+ private TaskExecutor _taskExecutor;
@Override
protected void setUp() throws Exception
{
super.setUp();
- _configuredObjectFactory = new ConfiguredObjectFactory(Model.getInstance());
- _systemContext = new SystemContextImpl(mock(TaskExecutor.class),
+ _configuredObjectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ _systemContext = new SystemContextImpl(_taskExecutor,
_configuredObjectFactory, mock(EventLogger.class), mock(LogRecorder.class), mock(BrokerOptions.class));
when(_brokerEntry.getId()).thenReturn(_brokerId);
@@ -93,6 +97,13 @@ public class BrokerRecovererTest extends TestCase
_brokerEntryChildren.put(AuthenticationProvider.class.getSimpleName(), Arrays.asList(_authenticationProviderEntry1));
}
+ @Override
+ protected void tearDown() throws Exception
+ {
+ super.tearDown();
+ _taskExecutor.stop();
+ }
+
public void testCreateBrokerAttributes()
{
Map<String, Object> attributes = new HashMap<String, Object>();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
index d9c0220e5f..07e9cecb8b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/FileKeyStoreCreationTest.java
@@ -36,6 +36,7 @@ import org.apache.qpid.server.model.AbstractConfiguredObject;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.KeyStore;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
@@ -51,7 +52,7 @@ public class FileKeyStoreCreationTest extends TestCase
public void setUp() throws Exception
{
super.setUp();
- _factory = new ConfiguredObjectFactory(Model.getInstance());
+ _factory = new ConfiguredObjectFactoryImpl(Model.getInstance());
}
public void testCreateWithAllAttributesProvided()
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
index 42b66cbb85..ea1d22f9ef 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/startup/VirtualHostCreationTest.java
@@ -35,6 +35,9 @@ import org.apache.qpid.server.configuration.IllegalConfigurationException;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.store.MessageStore;
@@ -49,8 +52,11 @@ public class VirtualHostCreationTest extends TestCase
{
SecurityManager securityManager = mock(SecurityManager.class);
ConfigurationEntry entry = mock(ConfigurationEntry.class);
+ ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
Broker parent = mock(Broker.class);
+ when(parent.getObjectFactory()).thenReturn(objectFactory);
when(parent.getSecurityManager()).thenReturn(securityManager);
+ when(parent.getCategoryClass()).thenReturn(Broker.class);
VirtualHostRegistry virtualHostRegistry = mock(VirtualHostRegistry.class);
when(virtualHostRegistry.getEventLogger()).thenReturn(mock(EventLogger.class));
when(parent.getVirtualHostRegistry()).thenReturn(virtualHostRegistry);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
index 4f8e0d99dc..82ced3c274 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ConfigurationEntryStoreTestCase.java
@@ -28,6 +28,7 @@ import java.util.UUID;
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfigurationEntryImpl;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
@@ -58,10 +59,15 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase
private Map<String, Object> _virtualHostAttributes;
private Map<String, Object> _authenticationProviderAttributes;
+ private TaskExecutor _taskExecutor;
+
public void setUp() throws Exception
{
super.setUp();
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+
_brokerId = UUID.randomUUID();
_brokerAttributes = new HashMap<String, Object>();
_brokerAttributes.put(Broker.DEFAULT_VIRTUAL_HOST, "test");
@@ -85,6 +91,18 @@ public abstract class ConfigurationEntryStoreTestCase extends QpidTestCase
addConfiguration(_authenticationProviderId, AuthenticationProvider.class.getSimpleName(), _authenticationProviderAttributes);
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ _taskExecutor.stop();
+ }
+
+ protected TaskExecutor getTaskExecutor()
+ {
+ return _taskExecutor;
+ }
+
// ??? perhaps it should not be abstract
protected abstract MemoryConfigurationEntryStore createStore(UUID brokerId, Map<String, Object> brokerAttributes) throws Exception;
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java
index e6d47e6966..d3dc996d3e 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/JsonConfigurationEntryStoreTest.java
@@ -42,11 +42,10 @@ import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfigurationEntryImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.SystemContext;
@@ -100,8 +99,8 @@ public class JsonConfigurationEntryStoreTest extends ConfigurationEntryStoreTest
{
final BrokerOptions brokerOptions = mock(BrokerOptions.class);
when(brokerOptions.getConfigurationStoreLocation()).thenReturn(absolutePath);
- SystemContext context = new SystemContextImpl(new TaskExecutor(),
- new ConfiguredObjectFactory(Model.getInstance()),
+ SystemContext context = new SystemContextImpl(getTaskExecutor(),
+ new ConfiguredObjectFactoryImpl(Model.getInstance()),
mock(EventLogger.class),
mock(LogRecorder.class),
brokerOptions);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
index 7859a4110b..50169d9f4f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/ManagementModeStoreHandlerTest.java
@@ -44,7 +44,7 @@ import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
@@ -67,6 +67,7 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
private ConfiguredObjectRecord _portEntry;
private UUID _rootId, _portEntryId;
private SystemContext _systemContext;
+ private TaskExecutor _taskExecutor;
protected void setUp() throws Exception
{
@@ -74,9 +75,10 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
_rootId = UUID.randomUUID();
_portEntryId = UUID.randomUUID();
_store = mock(DurableConfigurationStore.class);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
-
- _systemContext = new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory(Model.getInstance()), mock(
+ _systemContext = new SystemContextImpl(_taskExecutor, new ConfiguredObjectFactoryImpl(Model.getInstance()), mock(
EventLogger.class), mock(LogRecorder.class), new BrokerOptions());
@@ -113,6 +115,13 @@ public class ManagementModeStoreHandlerTest extends QpidTestCase
_handler.openConfigurationStore(_systemContext,Collections.<String,Object>emptyMap());
}
+ @Override
+ public void tearDown() throws Exception
+ {
+ _taskExecutor.stop();
+ super.tearDown();
+ }
+
private ConfiguredObjectRecord getRootEntry()
{
BrokerFinder brokerFinder = new BrokerFinder();
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java
index bf8da6f364..92d42d779f 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/configuration/store/MemoryConfigurationEntryStoreTest.java
@@ -36,11 +36,10 @@ import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.ConfigurationEntry;
import org.apache.qpid.server.configuration.ConfigurationEntryImpl;
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.SystemContextImpl;
@@ -53,7 +52,7 @@ public class MemoryConfigurationEntryStoreTest extends ConfigurationEntryStoreTe
public void setUp() throws Exception
{
super.setUp();
- _systemContext = new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory(Model.getInstance()),
+ _systemContext = new SystemContextImpl(getTaskExecutor(), new ConfiguredObjectFactoryImpl(Model.getInstance()),
mock(EventLogger.class), mock(LogRecorder.class),
new BrokerOptions());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
index 7fd4f05a14..269d36e5c1 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java
@@ -21,6 +21,9 @@
package org.apache.qpid.server.consumer;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.net.SocketAddress;
import java.security.Principal;
import java.util.ArrayList;
@@ -40,6 +43,7 @@ import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Consumer;
import org.apache.qpid.server.model.Port;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.Transport;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -229,6 +233,13 @@ public class MockConsumer implements ConsumerTarget
private static class MockSessionModel implements AMQSessionModel
{
private final UUID _id = UUID.randomUUID();
+ private Session _modelObject;
+
+ private MockSessionModel()
+ {
+ _modelObject = mock(Session.class);
+ when(_modelObject.getCategoryClass()).thenReturn(Session.class);
+ }
@Override
public UUID getId()
@@ -352,6 +363,18 @@ public class MockConsumer implements ConsumerTarget
}
@Override
+ public void setModelObject(final Session session)
+ {
+ _modelObject = session;
+ }
+
+ @Override
+ public Session<?> getModelObject()
+ {
+ return _modelObject;
+ }
+
+ @Override
public void removeConsumerListener(final ConsumerListener listener)
{
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
deleted file mode 100644
index 090e8eead3..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/DefaultExchangeFactoryTest.java
+++ /dev/null
@@ -1,229 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.exchange;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.qpid.server.plugin.ExchangeType;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-@SuppressWarnings("rawtypes")
-public class DefaultExchangeFactoryTest extends QpidTestCase
-{
- private DirectExchangeType _directExchangeType;
- private TopicExchangeType _topicExchangeType;
- private FanoutExchangeType _fanoutExchangeType;
- private HeadersExchangeType _headersExchangeType;
-
- private List<ExchangeType> _stubbedExchangeTypes;
-
- protected void setUp() throws Exception
- {
- super.setUp();
-
- _directExchangeType = new DirectExchangeType();
- _topicExchangeType = new TopicExchangeType();
- _fanoutExchangeType = new FanoutExchangeType();
- _headersExchangeType = new HeadersExchangeType();
- _stubbedExchangeTypes = new ArrayList<ExchangeType>();
- }
-
- public void testCreateDefaultExchangeFactory()
- {
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_topicExchangeType);
- _stubbedExchangeTypes.add(_fanoutExchangeType);
- _stubbedExchangeTypes.add(_headersExchangeType);
-
- DefaultExchangeFactory factory = new TestExchangeFactory();
-
- Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = factory.getRegisteredTypes();
- assertEquals("Unexpected number of exchange types", _stubbedExchangeTypes.size(), registeredTypes.size());
- assertTrue("Direct exchange type is not found", registeredTypes.contains(_directExchangeType));
- assertTrue("Fanout exchange type is not found", registeredTypes.contains(_fanoutExchangeType));
- assertTrue("Topic exchange type is not found", registeredTypes.contains(_topicExchangeType));
- assertTrue("Headers exchange type is not found", registeredTypes.contains(_headersExchangeType));
- }
-
- public void testCreateDefaultExchangeFactoryWithoutAllBaseExchangeTypes()
- {
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory without all base classes");
- }
- catch (IllegalStateException e)
- {
- // pass
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithoutDirectExchangeType()
- {
- _stubbedExchangeTypes.add(_topicExchangeType);
- _stubbedExchangeTypes.add(_fanoutExchangeType);
- _stubbedExchangeTypes.add(_headersExchangeType);
-
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory without all base classes");
- }
- catch (IllegalStateException e)
- {
- assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _directExchangeType.getType(), e.getMessage());
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithoutTopicExchangeType()
- {
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_fanoutExchangeType);
- _stubbedExchangeTypes.add(_headersExchangeType);
-
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory without all base classes");
- }
- catch (IllegalStateException e)
- {
- assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _topicExchangeType.getType(), e.getMessage());
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithoutFanoutExchangeType()
- {
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_topicExchangeType);
- _stubbedExchangeTypes.add(_headersExchangeType);
-
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory without all base classes");
- }
- catch (IllegalStateException e)
- {
- assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _fanoutExchangeType.getType(), e.getMessage());
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithoutHeadersExchangeType()
- {
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_topicExchangeType);
- _stubbedExchangeTypes.add(_fanoutExchangeType);
-
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory without all base classes");
- }
- catch (IllegalStateException e)
- {
- assertEquals("Unexpected exception message", "Did not find expected exchange type: " + _headersExchangeType.getType(), e.getMessage());
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithDuplicateExchangeTypeName()
- {
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_directExchangeType);
-
- try
- {
- new TestExchangeFactory();
- fail("Cannot create factory with duplicate exchange type names");
- }
- catch (IllegalStateException e)
- {
- assertTrue( "Unexpected exception message", e.getMessage().contains("ExchangeType with type name '"
- + _directExchangeType.getType() + "' is already registered using class '"
- + DirectExchangeType.class.getName()));
- }
- }
-
- public void testCreateDefaultExchangeFactoryWithCustomExchangeType()
- {
- ExchangeType<?> customExchangeType = new CustomExchangeType();
-
- _stubbedExchangeTypes.add(customExchangeType);
- _stubbedExchangeTypes.add(_directExchangeType);
- _stubbedExchangeTypes.add(_topicExchangeType);
- _stubbedExchangeTypes.add(_fanoutExchangeType);
- _stubbedExchangeTypes.add(_headersExchangeType);
-
- DefaultExchangeFactory factory = new TestExchangeFactory();
-
- Collection<ExchangeType<? extends ExchangeImpl>> registeredTypes = factory.getRegisteredTypes();
- assertEquals("Unexpected number of exchange types", _stubbedExchangeTypes.size(), registeredTypes.size());
- assertTrue("Direct exchange type is not found", registeredTypes.contains(_directExchangeType));
- assertTrue("Fanout exchange type is not found", registeredTypes.contains(_fanoutExchangeType));
- assertTrue("Topic exchange type is not found", registeredTypes.contains(_topicExchangeType));
- assertTrue("Headers exchange type is not found", registeredTypes.contains(_headersExchangeType));
- assertTrue("Custom exchange type is not found", registeredTypes.contains(customExchangeType));
- }
-
- public static abstract class CustomExchange implements ExchangeImpl<CustomExchange>
- {
- }
-
- private static class CustomExchangeType implements ExchangeType<CustomExchange>
- {
- @Override
- public String getType()
- {
- return "my-custom-exchange";
- }
-
- @Override
- public CustomExchange newInstance(VirtualHostImpl host, Map<String,Object> attributes)
- {
- return null;
- }
-
- @Override
- public String getDefaultExchangeName()
- {
- return null;
- }
- }
-
- private final class TestExchangeFactory extends DefaultExchangeFactory
- {
- private TestExchangeFactory()
- {
- super(null);
- }
-
- @Override
- protected Iterable<ExchangeType> loadExchangeTypes()
- {
- return _stubbedExchangeTypes;
- }
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
index ddc9f5edf8..33c333c407 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java
@@ -30,27 +30,30 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+
import junit.framework.TestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.UnknownExchangeException;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
public class FanoutExchangeTest extends TestCase
{
private FanoutExchange _exchange;
private VirtualHostImpl _virtualHost;
+ private TaskExecutor _taskExecutor;
public void setUp() throws UnknownExchangeException
{
@@ -59,11 +62,21 @@ public class FanoutExchangeTest extends TestCase
attributes.put(Exchange.NAME, "test");
attributes.put(Exchange.DURABLE, false);
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
_virtualHost = mock(VirtualHostImpl.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+ when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
_exchange = new FanoutExchange(_virtualHost, attributes);
+ _exchange.open();
+ }
+
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ _taskExecutor.stop();
}
public void testIsBoundStringMapAMQQueueWhenQueueIsNull()
@@ -115,6 +128,7 @@ public class FanoutExchangeTest extends TestCase
{
AMQQueue queue = mock(AMQQueue.class);
when(queue.getVirtualHost()).thenReturn(_virtualHost);
+ when(queue.getCategoryClass()).thenReturn(Queue.class);
return queue;
}
@@ -123,6 +137,7 @@ public class FanoutExchangeTest extends TestCase
AMQQueue queue1 = mockQueue();
AMQQueue queue2 = mockQueue();
+
_exchange.addBinding("key",queue1, null);
_exchange.addBinding("key",queue2, null);
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
index 34c1487861..a450c942e6 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
@@ -20,6 +20,11 @@
*/
package org.apache.qpid.server.exchange;
+import static org.mockito.Matchers.anySet;
+import static org.mockito.Matchers.anyString;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
@@ -28,40 +33,44 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
+
import junit.framework.TestCase;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.qpid.common.AMQPFilterTypes;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.Exchange;
+import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.BaseQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.virtualhost.VirtualHostImpl;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Matchers.anySet;
-import static org.mockito.Matchers.anyString;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
-
public class HeadersExchangeTest extends TestCase
{
private HeadersExchange _exchange;
private VirtualHostImpl _virtualHost;
+ private TaskExecutor _taskExecutor;
@Override
public void setUp() throws Exception
{
super.setUp();
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
_virtualHost = mock(VirtualHostImpl.class);
SecurityManager securityManager = mock(SecurityManager.class);
when(_virtualHost.getSecurityManager()).thenReturn(securityManager);
when(_virtualHost.getEventLogger()).thenReturn(new EventLogger());
+ when(_virtualHost.getCategoryClass()).thenReturn(VirtualHost.class);
+ when(_virtualHost.getTaskExecutor()).thenReturn(_taskExecutor);
Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(Exchange.ID, UUID.randomUUID());
attributes.put(Exchange.NAME, "test");
@@ -71,6 +80,12 @@ public class HeadersExchangeTest extends TestCase
}
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ _taskExecutor.stop();
+ }
+
protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception
{
List<? extends BaseQueue> results = _exchange.route(msg, "", InstanceProperties.EMPTY);
@@ -127,6 +142,7 @@ public class HeadersExchangeTest extends TestCase
AMQQueue q = mock(AMQQueue.class);
when(q.toString()).thenReturn(name);
when(q.getVirtualHost()).thenReturn(_virtualHost);
+ when(q.getCategoryClass()).thenReturn(Queue.class);
return q;
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
index 8b3a5336e8..ed2de21b6b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AMQQueueFactoryTest.java
@@ -39,7 +39,6 @@ import org.mockito.stubbing.Answer;
import org.apache.qpid.exchange.ExchangeDefaults;
import org.apache.qpid.server.configuration.BrokerProperties;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.LifetimePolicy;
@@ -237,7 +236,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
@@ -277,7 +276,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
String queueName = "testDeadLetterQueueEnabled";
- String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
@@ -320,7 +319,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
String queueName = "testDeadLetterQueueDisabled";
- String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
@@ -350,7 +349,7 @@ public class AMQQueueFactoryTest extends QpidTestCase
{
String queueName = "testDeadLetterQueueNotCreatedForAutodeleteQueues";
- String dlExchangeName = queueName + DefaultExchangeFactory.DEFAULT_DLE_NAME_SUFFIX;
+ String dlExchangeName = queueName + VirtualHostImpl.DEFAULT_DLE_NAME_SUFFIX;
String dlQueueName = queueName + AMQQueueFactory.DEFAULT_DLQ_NAME_SUFFIX;
assertNull("The DLQ should not yet exist", _virtualHost.getQueue(dlQueueName));
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
index e38e4daea0..37519e7a0b 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java
@@ -44,6 +44,7 @@ public class StandardQueueTest extends AbstractQueueTestBase
public void testAutoDeleteQueue() throws Exception
{
getQueue().stop();
+ getQueue().delete();
Map<String,Object> queueAttributes = new HashMap<String, Object>();
queueAttributes.put(Queue.ID, UUID.randomUUID());
queueAttributes.put(Queue.NAME, getQname());
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java
deleted file mode 100644
index 90308d316b..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerFactoryTest.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.security.group;
-
-import java.util.HashMap;
-import java.util.Map;
-
-import junit.framework.TestCase;
-
-import org.apache.qpid.server.model.GroupProvider;
-import org.apache.qpid.test.utils.TestFileUtils;
-
-public class FileGroupManagerFactoryTest extends TestCase
-{
-
- private FileGroupManagerFactory _factory = new FileGroupManagerFactory();
- private Map<String, Object> _configuration = new HashMap<String, Object>();
- private String _emptyButValidGroupFile = TestFileUtils.createTempFile(this).getAbsolutePath();
-
- public void testInstanceCreated() throws Exception
- {
- _configuration.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- _configuration.put(FileGroupManagerFactory.PATH, _emptyButValidGroupFile);
-
- GroupManager manager = _factory.createInstance(_configuration);
- assertNotNull(manager);
- assertTrue(manager instanceof FileGroupManager);
- }
-
- public void testReturnsNullWhenNoConfig() throws Exception
- {
- GroupManager manager = _factory.createInstance(_configuration);
- assertNull(manager);
- }
-
- public void testReturnsNullWhenConfigNotForThisPlugin() throws Exception
- {
- _configuration.put(GroupProvider.TYPE, "other-group-manager");
-
- GroupManager manager = _factory.createInstance(_configuration);
- assertNull(manager);
- }
-
-
- public void testRejectsConfigThatIsMissingAttributeValue() throws Exception
- {
- _configuration.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- _configuration.put(FileGroupManagerFactory.PATH, null);
-
- try
- {
- _factory.createInstance(_configuration);
- fail("Exception not thrown");
- }
- catch (RuntimeException re)
- {
- // PASS
- }
- }
-
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java
deleted file mode 100644
index 152703d548..0000000000
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/security/group/FileGroupManagerTest.java
+++ /dev/null
@@ -1,204 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.security.group;
-
-import java.io.File;
-import java.io.FileOutputStream;
-import java.security.Principal;
-import java.util.Properties;
-import java.util.Set;
-
-import org.apache.qpid.server.security.auth.UsernamePrincipal;
-import org.apache.qpid.test.utils.QpidTestCase;
-
-public class FileGroupManagerTest extends QpidTestCase
-{
- private static final String MYGROUP_USERS = "user1";
- private static final String MY_GROUP = "myGroup.users";
- private static final String MY_GROUP2 = "myGroup2.users";
- private File _tmpGroupFile;
- private FileGroupManager _manager;
-
- @Override
- public void tearDown() throws Exception
- {
- super.tearDown();
-
- if (_tmpGroupFile != null)
- {
- if (_tmpGroupFile.exists())
- {
- _tmpGroupFile.delete();
- }
- }
- }
-
- public void testValidGroupFile() throws Exception
- {
- final String groupFileName = writeGroupFile();
-
- _manager = new FileGroupManager(groupFileName);
- assertNotNull(_manager);
- }
-
- public void testNonExistentGroupFile() throws Exception
- {
- final String filePath = TMP_FOLDER + File.separator + "non.existing";
- File file = new File(filePath);
- if (file.exists())
- {
- file.delete();
- }
- assertFalse("File should not exist", file.exists());
- try
- {
- _manager = new FileGroupManager(filePath);
- assertFalse("File should be created", file.exists());
- _manager.onCreate();
- assertTrue("File should be created", file.exists());
- _manager.open();
- Set<Principal> groups = _manager.getGroupPrincipals();
- assertTrue("No group should exist", groups.isEmpty());
- }
- finally
- {
- file.delete();
- }
- }
-
- public void testGetGroupPrincipalsForUser() throws Exception
- {
- final String groupFileName = writeGroupFile();
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getGroupPrincipalsForUser("user1");
- assertEquals(1, principals.size());
- assertTrue(principals.contains(new GroupPrincipal("myGroup")));
- }
-
- public void testGetUserPrincipalsForGroup() throws Exception
- {
- final String groupFileName = writeGroupFile();
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup");
- assertEquals(1, principals.size());
- assertTrue(principals.contains(new UsernamePrincipal("user1")));
- }
-
- public void testGetGroupPrincipals() throws Exception
- {
- final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS, MY_GROUP2, MYGROUP_USERS);
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getGroupPrincipals();
- assertEquals(2, principals.size());
- assertTrue(principals.contains(new GroupPrincipal("myGroup")));
- assertTrue(principals.contains(new GroupPrincipal("myGroup2")));
- }
-
- public void testCreateGroup() throws Exception
- {
- final String groupFileName = writeGroupFile();
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getGroupPrincipals();
- assertEquals(1, principals.size());
-
- _manager.createGroup("myGroup2");
-
- principals = _manager.getGroupPrincipals();
- assertEquals(2, principals.size());
- assertTrue(principals.contains(new GroupPrincipal("myGroup2")));
- }
-
- public void testRemoveGroup() throws Exception
- {
- final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS);
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getGroupPrincipals();
- assertEquals(1, principals.size());
-
- _manager.removeGroup("myGroup");
-
- principals = _manager.getGroupPrincipals();
- assertEquals(0, principals.size());
- }
-
- public void testAddUserToGroup() throws Exception
- {
- final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS);
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup");
- assertEquals(1, principals.size());
- assertFalse(principals.contains(new UsernamePrincipal("user2")));
-
- _manager.addUserToGroup("user2", "myGroup");
-
- principals = _manager.getUserPrincipalsForGroup("myGroup");
- assertEquals(2, principals.size());
- assertTrue(principals.contains(new UsernamePrincipal("user2")));
- }
-
- public void testRemoveUserInGroup() throws Exception
- {
- final String groupFileName = writeGroupFile(MY_GROUP, MYGROUP_USERS);
- _manager = new FileGroupManager(groupFileName);
- _manager.open();
- Set<Principal> principals = _manager.getUserPrincipalsForGroup("myGroup");
- assertEquals(1, principals.size());
- assertTrue(principals.contains(new UsernamePrincipal("user1")));
-
- _manager.removeUserFromGroup("user1", "myGroup");
-
- principals = _manager.getUserPrincipalsForGroup("myGroup");
- assertEquals(0, principals.size());
- }
-
- private String writeGroupFile() throws Exception
- {
- return writeGroupFile(MY_GROUP, MYGROUP_USERS);
- }
-
- private String writeGroupFile(String... groupAndUsers) throws Exception
- {
- if (groupAndUsers.length % 2 != 0)
- {
- throw new IllegalArgumentException("Number of groupAndUsers must be even");
- }
-
- _tmpGroupFile = File.createTempFile("groups", "grp");
- _tmpGroupFile.deleteOnExit();
-
- Properties props = new Properties();
- for (int i = 0 ; i < groupAndUsers.length; i=i+2)
- {
- String group = groupAndUsers[i];
- String users = groupAndUsers[i+1];
- props.put(group, users);
- }
-
- props.store(new FileOutputStream(_tmpGroupFile), "test group file");
-
- return _tmpGroupFile.getCanonicalPath();
- }
-}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
index 1113f8699b..01f4ed4299 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/util/BrokerTestHelper.java
@@ -21,24 +21,33 @@
package org.apache.qpid.server.util;
import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
import java.net.SocketAddress;
+import java.security.PrivilegedAction;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
+import javax.security.auth.Subject;
+
+import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.store.JsonConfigurationEntryStore;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
-import org.apache.qpid.server.exchange.DefaultExchangeFactory;
import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.logging.EventLogger;
+import org.apache.qpid.server.logging.LogRecorder;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
+import org.apache.qpid.server.model.SystemContext;
+import org.apache.qpid.server.model.SystemContextImpl;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.protocol.AMQConnectionModel;
@@ -46,6 +55,7 @@ import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.SubjectCreator;
+import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.TestMemoryMessageStore;
import org.apache.qpid.server.virtualhost.AbstractVirtualHost;
@@ -68,6 +78,7 @@ public class BrokerTestHelper
public static Broker createBrokerMock()
{
+ ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
SubjectCreator subjectCreator = mock(SubjectCreator.class);
when(subjectCreator.getMechanisms()).thenReturn("");
Broker broker = mock(Broker.class);
@@ -77,7 +88,9 @@ public class BrokerTestHelper
when(broker.getSubjectCreator(any(SocketAddress.class))).thenReturn(subjectCreator);
when(broker.getVirtualHostRegistry()).thenReturn(new VirtualHostRegistry(new EventLogger()));
when(broker.getSecurityManager()).thenReturn(new SecurityManager(mock(Broker.class), false));
+ when(broker.getObjectFactory()).thenReturn(objectFactory);
when(broker.getEventLogger()).thenReturn(new EventLogger());
+ when(broker.getCategoryClass()).thenReturn(Broker.class);
return broker;
}
@@ -94,14 +107,20 @@ public class BrokerTestHelper
{
//VirtualHostFactory factory = new PluggableFactoryLoader<VirtualHostFactory>(VirtualHostFactory.class).get(hostType);
-
+ SystemContext systemContext = new SystemContextImpl(TASK_EXECUTOR,
+ new ConfiguredObjectFactoryImpl(Model.getInstance()),
+ mock(EventLogger.class),
+ mock(LogRecorder.class),
+ new BrokerOptions());
+ ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
Broker broker = mock(Broker.class);
+ when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext);
when(broker.getVirtualHostRegistry()).thenReturn(virtualHostRegistry);
when(broker.getTaskExecutor()).thenReturn(TASK_EXECUTOR);
SecurityManager securityManager = new SecurityManager(broker, false);
when(broker.getSecurityManager()).thenReturn(securityManager);
-
- ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactory(Model.getInstance());
+ when(broker.getCategoryClass()).thenReturn(Broker.class);
+ when(broker.getObjectFactory()).thenReturn(objectFactory);
ConfiguredObjectTypeFactory factory = objectFactory.getConfiguredObjectTypeFactory(org.apache.qpid.server.model.VirtualHost.class,
attributes);
@@ -165,17 +184,29 @@ public class BrokerTestHelper
public static ExchangeImpl createExchange(String hostName, final boolean durable, final EventLogger eventLogger) throws Exception
{
SecurityManager securityManager = new SecurityManager(mock(Broker.class), false);
- VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
+ final VirtualHostImpl virtualHost = mock(VirtualHostImpl.class);
when(virtualHost.getName()).thenReturn(hostName);
when(virtualHost.getSecurityManager()).thenReturn(securityManager);
when(virtualHost.getEventLogger()).thenReturn(eventLogger);
- DefaultExchangeFactory factory = new DefaultExchangeFactory(virtualHost);
- Map<String,Object> attributes = new HashMap<String, Object>();
+ when(virtualHost.getDurableConfigurationStore()).thenReturn(mock(DurableConfigurationStore.class));
+ ConfiguredObjectFactory objectFactory = new ConfiguredObjectFactoryImpl(Model.getInstance());
+ final Map<String,Object> attributes = new HashMap<String, Object>();
attributes.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID("amp.direct", virtualHost.getName()));
attributes.put(org.apache.qpid.server.model.Exchange.NAME, "amq.direct");
attributes.put(org.apache.qpid.server.model.Exchange.TYPE, "direct");
attributes.put(org.apache.qpid.server.model.Exchange.DURABLE, durable);
- return factory.createExchange(attributes);
+ final ConfiguredObjectTypeFactory<? extends Exchange> exchangeFactory =
+ objectFactory.getConfiguredObjectTypeFactory(Exchange.class, attributes);
+ return Subject.doAs(SecurityManager.getSubjectWithAddedSystemRights(), new PrivilegedAction<ExchangeImpl>()
+ {
+ @Override
+ public ExchangeImpl run()
+ {
+
+ return (ExchangeImpl) exchangeFactory.create(attributes, virtualHost);
+ }
+ });
+
}
public static AMQQueue createQueue(String queueName, VirtualHostImpl virtualHost)
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
index e6b57d8039..8cbf999dfb 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/DurableConfigurationRecovererTest.java
@@ -20,6 +20,15 @@
*/
package org.apache.qpid.server.virtualhost;
+import static org.mockito.Matchers.any;
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyMap;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
@@ -28,43 +37,39 @@ import java.util.LinkedHashMap;
import java.util.Map;
import java.util.UUID;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.logging.EventLogger;
-import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
-import org.apache.qpid.server.store.StoreException;
+import org.mockito.ArgumentCaptor;
+import org.mockito.invocation.InvocationOnMock;
+import org.mockito.stubbing.Answer;
+
import org.apache.qpid.server.configuration.IllegalConfigurationException;
-import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.DirectExchange;
-import org.apache.qpid.server.exchange.ExchangeFactory;
-import org.apache.qpid.server.exchange.ExchangeRegistry;
-import org.apache.qpid.server.exchange.FanoutExchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.exchange.TopicExchange;
+import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.model.Binding;
+import org.apache.qpid.server.model.Broker;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Queue;
+import org.apache.qpid.server.model.SystemContext;
import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.ConfiguredObjectTypeFactory;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
+import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.DurableConfigurationRecoverer;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.DurableConfiguredObjectRecoverer;
+import org.apache.qpid.server.store.StoreException;
+import org.apache.qpid.server.store.UnresolvedConfiguredObject;
import org.apache.qpid.server.util.MapValueConverter;
import org.apache.qpid.test.utils.QpidTestCase;
-import org.mockito.ArgumentCaptor;
-import org.mockito.invocation.InvocationOnMock;
-import org.mockito.stubbing.Answer;
-
-import static org.mockito.Matchers.any;
-import static org.mockito.Matchers.anyBoolean;
-import static org.mockito.Matchers.eq;
-import static org.mockito.Mockito.doAnswer;
-import static org.mockito.Mockito.doThrow;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.when;
public class DurableConfigurationRecovererTest extends QpidTestCase
{
@@ -76,50 +81,55 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
private static final String CUSTOM_EXCHANGE_NAME = "customExchange";
private DurableConfigurationRecoverer _durableConfigurationRecoverer;
- private ExchangeImpl<?> _directExchange;
- private ExchangeImpl<?> _topicExchange;
- private ExchangeImpl<?> _matchExchange;
- private ExchangeImpl<?> _fanoutExchange;
private VirtualHostImpl _vhost;
private DurableConfigurationStore _store;
- private ExchangeFactory _exchangeFactory;
- private ExchangeRegistry _exchangeRegistry;
private QueueFactory _queueFactory;
+ private ConfiguredObjectFactory _configuredObjectFactory;
+ private ConfiguredObjectTypeFactory _exchangeFactory;
@Override
public void setUp() throws Exception
{
super.setUp();
-
- _exchangeFactory = mock(ExchangeFactory.class);
-
- _directExchange = createAndRegisterDefaultExchangeWithFactory(DirectExchange.TYPE);
- _topicExchange = createAndRegisterDefaultExchangeWithFactory(TopicExchange.TYPE);
- _matchExchange = createAndRegisterDefaultExchangeWithFactory(HeadersExchange.TYPE);
- _fanoutExchange = createAndRegisterDefaultExchangeWithFactory(FanoutExchange.TYPE);
+ _configuredObjectFactory = mock(ConfiguredObjectFactory.class);
+ _exchangeFactory = mock(ConfiguredObjectTypeFactory.class);
AMQQueue<?> queue = mock(AMQQueue.class);
_vhost = mock(VirtualHostImpl.class);
when(_vhost.getName()).thenReturn(VIRTUAL_HOST_NAME);
-
- _exchangeRegistry = mock(ExchangeRegistry.class);
+ final Broker<?> broker = mock(Broker.class);
+ final SystemContext systemContext = mock(SystemContext.class);
+ when(systemContext.getObjectFactory()).thenReturn(_configuredObjectFactory);
+ when(broker.getObjectFactory()).thenReturn(_configuredObjectFactory);
+ when(broker.getParent(eq(SystemContext.class))).thenReturn(systemContext);
+ when(_vhost.getParent(eq(Broker.class))).thenReturn(broker);
when(_vhost.getQueue(eq(QUEUE_ID))).thenReturn(queue);
- final ArgumentCaptor<ExchangeImpl> registeredExchange = ArgumentCaptor.forClass(ExchangeImpl.class);
+ when(_configuredObjectFactory.getConfiguredObjectTypeFactory(eq(Exchange.class), anyMap())).thenReturn(_exchangeFactory);
+
+ final ArgumentCaptor<ConfiguredObjectRecord> recoveredExchange = ArgumentCaptor.forClass(ConfiguredObjectRecord.class);
doAnswer(new Answer()
{
@Override
public Object answer(final InvocationOnMock invocation) throws Throwable
{
- ExchangeImpl exchange = registeredExchange.getValue();
- when(_exchangeRegistry.getExchange(eq(exchange.getId()))).thenReturn(exchange);
- when(_exchangeRegistry.getExchange(eq(exchange.getName()))).thenReturn(exchange);
- return null;
+ ConfiguredObjectRecord exchangeRecord = recoveredExchange.getValue();
+ ExchangeImpl exchange = mock(ExchangeImpl.class);
+ UUID id = exchangeRecord.getId();
+ String name = (String) exchangeRecord.getAttributes().get("name");
+ when(exchange.getId()).thenReturn(id);
+ when(exchange.getName()).thenReturn(name);
+ when(_vhost.getExchange(eq(id))).thenReturn(exchange);
+ when(_vhost.getExchange(eq(name))).thenReturn(exchange);
+
+ UnresolvedConfiguredObject unresolved = mock(UnresolvedConfiguredObject.class);
+ when(unresolved.resolve()).thenReturn(exchange);
+ return unresolved;
}
- }).when(_exchangeRegistry).registerExchange(registeredExchange.capture());
+ }).when(_exchangeFactory).recover(recoveredExchange.capture(), any(ConfiguredObject.class));
@@ -164,7 +174,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
{
final UUID exchangeId = UUID.fromString(args.get(Queue.ALTERNATE_EXCHANGE).toString());
final ExchangeImpl exchange =
- (ExchangeImpl) _exchangeRegistry.getExchange(exchangeId);
+ (ExchangeImpl) _vhost.getExchange(exchangeId);
queue.setAlternateExchange(exchange);
}
return queue;
@@ -174,9 +184,9 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
DurableConfiguredObjectRecoverer[] recoverers = {
- new QueueRecoverer(_vhost, _exchangeRegistry, _queueFactory),
- new ExchangeRecoverer(_exchangeRegistry, _exchangeFactory),
- new BindingRecoverer(_vhost, _exchangeRegistry)
+ new QueueRecoverer(_vhost, _queueFactory),
+ new ExchangeRecoverer(_vhost),
+ new BindingRecoverer(_vhost)
};
final Map<String, DurableConfiguredObjectRecoverer> recovererMap= new HashMap<String, DurableConfiguredObjectRecoverer>();
@@ -192,19 +202,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
- private ExchangeImpl<?> createAndRegisterDefaultExchangeWithFactory(ExchangeType<?> exchangeType) throws AMQUnknownExchangeType, UnknownExchangeException
- {
- ExchangeImpl exchange = mock(ExchangeImpl.class);
- when(exchange.getExchangeType()).thenReturn(exchangeType);
- Map<String, Object> directExchangeAttrsWithId = new HashMap<String, Object>();
- directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.ID, UUIDGenerator.generateExchangeUUID(exchangeType.getDefaultExchangeName(), VIRTUAL_HOST_NAME));
- directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.DURABLE, true);
- directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.TYPE, exchangeType.getType());
- directExchangeAttrsWithId.put(org.apache.qpid.server.model.Exchange.NAME, exchangeType.getDefaultExchangeName());
- when(_exchangeFactory.restoreExchange(directExchangeAttrsWithId)).thenReturn(exchange);
- return exchange;
- }
-
public void testUpgradeEmptyStore() throws Exception
{
_durableConfigurationRecoverer.beginConfigurationRecovery(_store);
@@ -295,47 +292,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
"org.apache.qpid.server.model.Exchange",
createExchange(CUSTOM_EXCHANGE_NAME, HeadersExchange.TYPE)));
- final ExchangeImpl customExchange = mock(ExchangeImpl.class);
-
- final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
- when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>()
- {
- @Override
- public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
- {
- Map arguments = attributesCaptor.getValue();
- String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME);
- if(CUSTOM_EXCHANGE_NAME.equals(exchangeName)
- && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
- && customExchangeId.equals((UUID) arguments.get(org.apache.qpid.server.model.Exchange.ID)))
- {
- return customExchange;
- }
- else if ("amq.topic".equals(exchangeName))
- {
- return _topicExchange;
- }
- else if ("amq.direct".equals(exchangeName))
- {
- return _directExchange;
- }
- else if ("amq.fanout".equals(exchangeName))
- {
- return _fanoutExchange;
- }
- else if ("amq.match".equals(exchangeName))
- {
- return _matchExchange;
- }
- else
- {
- return null;
- }
- }
- });
-
-
-
final ConfiguredObjectRecord[] expected = {
new ConfiguredObjectRecordImpl(new UUID(1, 0), "org.apache.qpid.server.model.Binding",
createBinding("key", "not-a-selector", "moo")),
@@ -443,49 +399,6 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
final UUID queueId = new UUID(1, 0);
final UUID exchangeId = new UUID(2, 0);
- final ExchangeImpl customExchange = mock(ExchangeImpl.class);
-
- when(customExchange.getId()).thenReturn(exchangeId);
- when(customExchange.getName()).thenReturn(CUSTOM_EXCHANGE_NAME);
-
- final ArgumentCaptor<Map> attributesCaptor = ArgumentCaptor.forClass(Map.class);
-
- when(_exchangeFactory.restoreExchange(attributesCaptor.capture())).thenAnswer(new Answer<ExchangeImpl>()
- {
- @Override
- public ExchangeImpl answer(final InvocationOnMock invocation) throws Throwable
- {
- Map arguments = attributesCaptor.getValue();
- String exchangeName = (String) arguments.get(org.apache.qpid.server.model.Exchange.NAME);
- if(CUSTOM_EXCHANGE_NAME.equals(exchangeName)
- && HeadersExchange.TYPE.getType().equals(arguments.get(org.apache.qpid.server.model.Exchange.TYPE))
- && exchangeId.equals(arguments.get(org.apache.qpid.server.model.Exchange.ID)))
- {
- return customExchange;
- }
- else if ("amq.topic".equals(exchangeName))
- {
- return _topicExchange;
- }
- else if ("amq.direct".equals(exchangeName))
- {
- return _directExchange;
- }
- else if ("amq.fanout".equals(exchangeName))
- {
- return _fanoutExchange;
- }
- else if ("amq.match".equals(exchangeName))
- {
- return _matchExchange;
- }
- else
- {
- return null;
- }
- }
- });
-
_durableConfigurationRecoverer.beginConfigurationRecovery(_store);
_durableConfigurationRecoverer.configuredObject(new ConfiguredObjectRecordImpl(queueId, Queue.class.getSimpleName(),
@@ -496,7 +409,7 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
_durableConfigurationRecoverer.completeConfigurationRecovery();
- assertEquals(customExchange, _vhost.getQueue(queueId).getAlternateExchange());
+ assertEquals(CUSTOM_EXCHANGE_NAME, _vhost.getQueue(queueId).getAlternateExchange().getName());
}
private void verifyCorrectUpdates(final ConfiguredObjectRecord[] expected) throws StoreException
@@ -578,4 +491,5 @@ public class DurableConfigurationRecovererTest extends QpidTestCase
}
+
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
index 37b464be40..85eede527a 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/virtualhost/MockVirtualHost.java
@@ -35,12 +35,12 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.ConfiguredObjectAttribute;
+import org.apache.qpid.server.model.ConfiguredObjectFactory;
import org.apache.qpid.server.model.Connection;
import org.apache.qpid.server.model.IllegalStateTransitionException;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHostAlias;
-import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.protocol.LinkRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueRegistry;
@@ -433,20 +433,15 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
return null;
}
- @Override
- public Collection<ExchangeType<? extends ExchangeImpl>> getExchangeTypes()
- {
- return null;
- }
-
public SecurityManager getSecurityManager()
{
return null;
}
@Override
- public void addVirtualHostListener(VirtualHostListener listener)
+ public ConfiguredObjectFactory getObjectFactory()
{
+ return null;
}
public LinkRegistry getLinkRegistry(String remoteContainerId)
@@ -535,6 +530,18 @@ public class MockVirtualHost implements VirtualHostImpl<MockVirtualHost, AMQQueu
}
@Override
+ public <C extends ConfiguredObject> C getChildById(final Class<C> clazz, final UUID id)
+ {
+ return null;
+ }
+
+ @Override
+ public <C extends ConfiguredObject> C getChildByName(final Class<C> clazz, final String name)
+ {
+ return null;
+ }
+
+ @Override
public <C extends ConfiguredObject> C createChild(final Class<C> childClass,
final Map<String, Object> attributes,
final ConfiguredObject... otherParents)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
index 718a1d0b9b..275ab4416e 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java
@@ -20,6 +20,10 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
+import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
+
import java.net.SocketAddress;
import java.security.Principal;
import java.security.PrivilegedAction;
@@ -29,7 +33,9 @@ import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
+
import javax.security.auth.Subject;
+
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.connection.ConnectionPrincipal;
import org.apache.qpid.server.logging.EventLogger;
@@ -55,10 +61,6 @@ import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.ProtocolEvent;
import org.apache.qpid.transport.Session;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SOCKET_FORMAT;
-import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.USER_FORMAT;
-
public class ServerConnection extends Connection implements AMQConnectionModel<ServerConnection, ServerSession>,
LogSubject, AuthorizationHolder
{
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
index 0dc1a822e9..c2ef68d812 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnectionDelegate.java
@@ -299,6 +299,7 @@ public class ServerConnectionDelegate extends ServerDelegate
stopAllSubscriptions(conn, dtc);
Session ssn = conn.getSession(dtc.getChannel());
((ServerSession)ssn).setClose(true);
+ ((ServerSession)ssn).getModelObject().delete();
super.sessionDetach(conn, dtc);
}
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
index 07345e7f0a..187b2bf569 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java
@@ -130,6 +130,7 @@ public class ServerSession extends Session
private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
+ private org.apache.qpid.server.model.Session<?> _modelObject;
public static interface MessageDispositionChangeListener
@@ -859,6 +860,10 @@ public class ServerSession extends Session
// unregister subscriptions in order to prevent sending of new messages
// to subscriptions with closing session
unregisterSubscriptions();
+ if(_modelObject != null)
+ {
+ _modelObject.delete();
+ }
super.close();
}
@@ -1003,6 +1008,18 @@ public class ServerSession extends Session
_consumerListeners.remove(listener);
}
+ @Override
+ public void setModelObject(final org.apache.qpid.server.model.Session<?> session)
+ {
+ _modelObject = session;
+ }
+
+ @Override
+ public org.apache.qpid.server.model.Session<?> getModelObject()
+ {
+ return _modelObject;
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
index 4fa4dcaa11..200519a285 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java
@@ -20,20 +20,22 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.nio.ByteBuffer;
import java.security.AccessControlException;
+import java.util.Collection;
import java.util.EnumSet;
import java.util.HashMap;
import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
import java.util.UUID;
+
import org.apache.log4j.Logger;
import org.apache.qpid.server.consumer.ConsumerImpl;
+import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.DirectExchange;
import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.store.StoreException;
-import org.apache.qpid.server.exchange.AMQUnknownExchangeType;
import org.apache.qpid.server.exchange.HeadersExchange;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
@@ -43,13 +45,15 @@ import org.apache.qpid.server.message.InstanceProperties;
import org.apache.qpid.server.message.MessageDestination;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.MessageSource;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.plugin.ExchangeType;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.StoreFuture;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AlreadyKnownDtxException;
@@ -63,14 +67,15 @@ import org.apache.qpid.server.txn.SuspendAndFailDtxException;
import org.apache.qpid.server.txn.TimeoutDtxException;
import org.apache.qpid.server.txn.UnknownDtxBranchException;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.*;
+import org.apache.qpid.server.virtualhost.ExchangeExistsException;
+import org.apache.qpid.server.virtualhost.ExchangeIsAlternateException;
+import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
+import org.apache.qpid.server.virtualhost.ReservedExchangeNameException;
+import org.apache.qpid.server.virtualhost.UnknownExchangeException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.transport.*;
-import java.nio.ByteBuffer;
-import java.util.Collection;
-import java.util.List;
-import java.util.Map;
-
public class ServerSessionDelegate extends SessionDelegate
{
private static final Logger LOGGER = Logger.getLogger(ServerSessionDelegate.class);
@@ -1278,7 +1283,7 @@ public class ServerSessionDelegate extends SessionDelegate
arguments.put(Queue.ALTERNATE_EXCHANGE, alternateExchangeName);
}
- final UUID id = UUIDGenerator.generateQueueUUID(queueName, virtualHost.getName());
+ final UUID id = UUID.randomUUID();
arguments.put(Queue.ID, id);
arguments.put(Queue.NAME, queueName);
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index b852b22abb..70094ea7c7 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -86,6 +86,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.CapacityChecker;
@@ -195,6 +196,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
+ private Session<?> _modelObject;
public AMQChannel(T session, int channelId, final MessageStore messageStore)
@@ -737,6 +739,10 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
_transaction.rollback();
+ if(_modelObject != null)
+ {
+ _modelObject.delete();
+ }
try
{
@@ -1759,4 +1765,16 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
_consumerListeners.remove(listener);
}
+
+ @Override
+ public void setModelObject(final Session<?> session)
+ {
+ _modelObject = session;
+ }
+
+ @Override
+ public Session<?> getModelObject()
+ {
+ return _modelObject;
+ }
}
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
index 42cb66ce7e..ef8d01d89f 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/QueueDeclareHandler.java
@@ -36,7 +36,6 @@ import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.model.ExclusivityPolicy;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.Queue;
-import org.apache.qpid.server.model.UUIDGenerator;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.v0_8.AMQChannel;
import org.apache.qpid.server.protocol.v0_8.AMQProtocolSession;
@@ -192,7 +191,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener<QueueDeclar
QueueArgumentsConverter.convertWireArgsToModel(FieldTable.convertToMap(body.getArguments()));
final String queueNameString = AMQShortString.toString(queueName);
attributes.put(Queue.NAME, queueNameString);
- attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(queueNameString, virtualHost.getName()));
+ attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.DURABLE, durable);
LifetimePolicy lifetimePolicy;
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
index 2322865c80..7f4a3701cd 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java
@@ -272,11 +272,13 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr
}
}
+ private static final AtomicInteger portNumber = new AtomicInteger(0);
+
private static class TestNetworkConnection implements NetworkConnection
{
private String _remoteHost = "127.0.0.1";
private String _localHost = "127.0.0.1";
- private int _port = 1;
+ private int _port = portNumber.incrementAndGet();
private final Sender<ByteBuffer> _sender;
public TestNetworkConnection()
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
index aa0c3f2e4b..f6823824fd 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java
@@ -21,16 +21,18 @@
package org.apache.qpid.server.protocol.v1_0;
import java.security.AccessControlException;
-import java.util.*;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.binding.BindingImpl;
-import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
-import org.apache.qpid.server.model.ExclusivityPolicy;
-import org.apache.qpid.server.model.LifetimePolicy;
-import org.apache.qpid.server.model.Queue;
+
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
@@ -41,26 +43,41 @@ import org.apache.qpid.amqp_1_0.type.DeliveryState;
import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.Symbol;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
-import org.apache.qpid.amqp_1_0.type.messaging.*;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Filter;
+import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Modified;
+import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter;
+import org.apache.qpid.amqp_1_0.type.messaging.Released;
+import org.apache.qpid.amqp_1_0.type.messaging.Source;
+import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode;
+import org.apache.qpid.amqp_1_0.type.messaging.Target;
+import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability;
import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.Error;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.filter.SelectorParsingException;
import org.apache.qpid.filter.selector.ParseException;
+import org.apache.qpid.server.binding.BindingImpl;
+import org.apache.qpid.server.consumer.ConsumerImpl;
import org.apache.qpid.server.exchange.DirectExchange;
+import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.exchange.TopicExchange;
import org.apache.qpid.server.filter.JMSSelectorFilter;
import org.apache.qpid.server.filter.SimpleFilterManager;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.MessageSource;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.ExclusivityPolicy;
+import org.apache.qpid.server.model.LifetimePolicy;
+import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.server.virtualhost.VirtualHostImpl;
import org.apache.qpid.server.virtualhost.QueueExistsException;
+import org.apache.qpid.server.virtualhost.VirtualHostImpl;
public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryStateHandler
{
@@ -204,7 +221,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS
if(queue == null)
{
Map<String,Object> attributes = new HashMap<String,Object>();
- attributes.put(Queue.ID, UUIDGenerator.generateQueueUUID(name, _vhost.getName()));
+ attributes.put(Queue.ID, UUID.randomUUID());
attributes.put(Queue.NAME, name);
attributes.put(Queue.DURABLE, isDurable);
attributes.put(Queue.LIFETIME_POLICY, LifetimePolicy.DELETE_ON_NO_OUTBOUND_LINKS);
diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
index 36aab5ddac..2c7884b3ce 100644
--- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
+++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java
@@ -78,7 +78,7 @@ import org.apache.qpid.server.message.MessageSource;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
-import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.protocol.AMQSessionModel;
import org.apache.qpid.server.protocol.ConsumerListener;
import org.apache.qpid.server.protocol.LinkRegistry;
@@ -112,6 +112,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>();
private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener();
private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>();
+ private Session<?> _modelObject;
public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint)
@@ -433,7 +434,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
? null
: (LifetimePolicy) properties.get(LIFETIME_POLICY);
Map<String,Object> attributes = new HashMap<String,Object>();
- attributes.put(org.apache.qpid.server.model.Queue.ID, UUIDGenerator.generateQueueUUID(queueName, getVirtualHost().getName()));
+ attributes.put(org.apache.qpid.server.model.Queue.ID, UUID.randomUUID());
attributes.put(org.apache.qpid.server.model.Queue.NAME, queueName);
attributes.put(org.apache.qpid.server.model.Queue.DURABLE, false);
@@ -570,6 +571,10 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
{
performCloseTasks();
_endpoint.end();
+ if(_modelObject != null)
+ {
+ _modelObject.delete();
+ }
}
protected void performCloseTasks()
@@ -844,6 +849,18 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio
_consumerListeners.remove(listener);
}
+ @Override
+ public void setModelObject(final Session<?> session)
+ {
+ _modelObject = session;
+ }
+
+ @Override
+ public Session<?> getModelObject()
+ {
+ return _modelObject;
+ }
+
private void consumerAdded(Consumer<?> consumer)
{
for(ConsumerListener l : _consumerListeners)
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
index 8f282c0d50..0f19b097e7 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/VirtualHostManagerMBean.java
@@ -49,6 +49,7 @@ import org.apache.qpid.server.model.Queue;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.VirtualHost;
import org.apache.qpid.server.queue.QueueArgumentsConverter;
+import org.apache.qpid.server.virtualhost.RequiredExchangeException;
@MBeanDescription("This MBean exposes the broker level management features")
public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<VirtualHost> implements ManagedBroker
@@ -185,6 +186,10 @@ public class VirtualHostManagerMBean extends AbstractStatisticsGatheringMBean<Vi
{
theExchange.delete();
}
+ catch(RequiredExchangeException e)
+ {
+ throw new UnsupportedOperationException(e.getMessage(), e);
+ }
catch (IllegalStateException ex)
{
final JMException jme = new JMException(ex.toString());
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
index d9320a93d0..7a954c0185 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/GroupProviderRestTest.java
@@ -36,7 +36,8 @@ import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.LifetimePolicy;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.security.group.FileGroupManagerFactory;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProvider;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl;
import org.apache.qpid.test.utils.TestBrokerConfiguration;
import org.apache.qpid.test.utils.TestFileUtils;
@@ -76,11 +77,11 @@ public class GroupProviderRestTest extends QpidRestTestCase
assertEquals("Unexpected number of providers", 1, providerDetails.size());
for (Map<String, Object> provider : providerDetails)
{
- assertProvider(FILE_GROUP_MANAGER, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, provider);
+ assertProvider(FILE_GROUP_MANAGER, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, provider);
Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/"
+ provider.get(GroupProvider.NAME));
assertNotNull("Cannot load data for " + provider.get(GroupProvider.NAME), data);
- assertProvider(FILE_GROUP_MANAGER, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, data);
+ assertProvider(FILE_GROUP_MANAGER, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, data);
}
}
@@ -127,16 +128,16 @@ public class GroupProviderRestTest extends QpidRestTestCase
String providerName = getTestName();
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath());
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Group provider was not created", 201, responseCode);
Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + providerName + "?depth=2");
- assertProvider(providerName, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE, data);
+ assertProvider(providerName, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE, data);
assertEquals("Unexpected name", providerName, data.get(GroupProvider.NAME));
- assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileGroupManagerFactory.PATH));
+ assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileBasedGroupProvider.PATH));
@SuppressWarnings("unchecked")
List<Map<String, Object>> groups = (List<Map<String, Object>>) data.get("groups");
@@ -174,7 +175,7 @@ public class GroupProviderRestTest extends QpidRestTestCase
String providerName = getTestName();
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Group provider was created", 409, responseCode);
@@ -189,15 +190,15 @@ public class GroupProviderRestTest extends QpidRestTestCase
String providerName = getTestName();
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath());
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Group provider was not created", 201, responseCode);
Map<String, Object> data = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + providerName);
assertEquals("Unexpected name", providerName, data.get(GroupProvider.NAME));
- assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileGroupManagerFactory.PATH));
+ assertEquals("Unexpected path", groupFile.getAbsolutePath(), data.get(FileBasedGroupProvider.PATH));
@SuppressWarnings("unchecked")
List<Map<String, Object>> groups = (List<Map<String, Object>>) data.get("groups");
@@ -220,8 +221,8 @@ public class GroupProviderRestTest extends QpidRestTestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath());
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Group provider was not created", 201, responseCode);
@@ -244,8 +245,8 @@ public class GroupProviderRestTest extends QpidRestTestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath());
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Expected to fail because we can have only one password provider", 201, responseCode);
@@ -271,14 +272,14 @@ public class GroupProviderRestTest extends QpidRestTestCase
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, providerName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFile.getAbsolutePath());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFile.getAbsolutePath());
int responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Expected to fail because we can have only one password provider", 201, responseCode);
File newGroupFile = new File(TMP_FOLDER + File.separator + getTestName() + File.separator + "groups");
- attributes.put(FileGroupManagerFactory.PATH, newGroupFile.getAbsolutePath());
+ attributes.put(FileBasedGroupProvider.PATH, newGroupFile.getAbsolutePath());
responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + providerName, "PUT", attributes);
assertEquals("Expected to fail because we can have only one password provider", 409, responseCode);
@@ -310,7 +311,7 @@ public class GroupProviderRestTest extends QpidRestTestCase
Map<String, Object> groupProvider = getRestTestHelper().getJsonAsSingletonList("/rest/groupprovider/" + TestBrokerConfiguration.ENTRY_NAME_GROUP_FILE);
assertEquals("Unexpected id", id.toString(), groupProvider.get(GroupProvider.ID));
- assertEquals("Unexpected path", file.getAbsolutePath() , groupProvider.get(FileGroupManagerFactory.PATH));
+ assertEquals("Unexpected path", file.getAbsolutePath() , groupProvider.get(FileBasedGroupProvider.PATH));
assertEquals("Unexpected state", State.ERRORED.name() , groupProvider.get(GroupProvider.STATE));
int status = getRestTestHelper().submitRequest("/rest/groupprovider/" + TestBrokerConfiguration.ENTRY_NAME_GROUP_FILE, "DELETE", null);
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
index 1a39d9c3b0..541160cd80 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/systest/rest/acl/BrokerACLTest.java
@@ -41,13 +41,14 @@ import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.TrustStore;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProvider;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl;
import org.apache.qpid.server.security.FileKeyStore;
import org.apache.qpid.server.security.FileTrustStore;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
import org.apache.qpid.server.security.acl.AbstractACLTestCase;
import org.apache.qpid.server.security.auth.manager.AnonymousAuthenticationManagerFactory;
import org.apache.qpid.server.security.auth.manager.PlainPasswordFileAuthenticationManagerFactory;
-import org.apache.qpid.server.security.group.FileGroupManagerFactory;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.virtualhost.StandardVirtualHost;
import org.apache.qpid.systest.rest.QpidRestTestCase;
@@ -738,8 +739,8 @@ public class BrokerACLTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, groupProviderName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, "/path/to/file");
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, "/path/to/file");
responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes);
assertEquals("Setting of group provider attributes should be allowed but not supported", 409, responseCode);
}
@@ -761,8 +762,8 @@ public class BrokerACLTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, groupProviderName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, "/path/to/file");
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, "/path/to/file");
responseCode = getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes);
assertEquals("Setting of group provider attributes should be denied", 403, responseCode);
}
@@ -852,8 +853,8 @@ public class BrokerACLTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, accessControlProviderName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, "/path/to/file");
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, "/path/to/file");
responseCode = getRestTestHelper().submitRequest("/rest/accesscontrolprovider/" + accessControlProviderName, "PUT", attributes);
assertEquals("Setting of access control provider attributes should be allowed but not supported", 409, responseCode);
}
@@ -875,8 +876,8 @@ public class BrokerACLTest extends QpidRestTestCase
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, accessControlProviderName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, "/path/to/file");
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, "/path/to/file");
responseCode = getRestTestHelper().submitRequest("/rest/accesscontrolprovider/" + accessControlProviderName, "PUT", attributes);
assertEquals("Setting of access control provider attributes should be denied", 403, responseCode);
}
@@ -1073,8 +1074,8 @@ public class BrokerACLTest extends QpidRestTestCase
File file = TestFileUtils.createTempFile(this, ".groups");
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, groupProviderName);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, file.getAbsoluteFile());
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, file.getAbsoluteFile());
return getRestTestHelper().submitRequest("/rest/groupprovider/" + groupProviderName, "PUT", attributes);
}
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
index c31f2eed3d..0e9256e1c6 100755
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/QpidBrokerTestCase.java
@@ -62,6 +62,7 @@ import org.apache.qpid.jms.ConnectionURL;
import org.apache.qpid.server.Broker;
import org.apache.qpid.server.BrokerOptions;
import org.apache.qpid.server.configuration.BrokerProperties;
+import org.apache.qpid.server.configuration.updater.TaskExecutor;
import org.apache.qpid.server.model.Port;
import org.apache.qpid.server.model.Protocol;
import org.apache.qpid.server.model.VirtualHost;
@@ -77,6 +78,8 @@ import org.apache.qpid.util.SystemUtils;
*/
public class QpidBrokerTestCase extends QpidTestCase
{
+ private TaskExecutor _taskExecutor;
+
public enum BrokerType
{
EXTERNAL /** Test case relies on a Broker started independently of the test-suite */,
@@ -226,7 +229,12 @@ public class QpidBrokerTestCase extends QpidTestCase
public TestBrokerConfiguration createBrokerConfiguration(int port)
{
int actualPort = getPort(port);
- TestBrokerConfiguration configuration = new TestBrokerConfiguration(System.getProperty(_brokerStoreType), _configFile.getAbsolutePath());
+ if(_taskExecutor == null)
+ {
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
+ }
+ TestBrokerConfiguration configuration = new TestBrokerConfiguration(System.getProperty(_brokerStoreType), _configFile.getAbsolutePath(), _taskExecutor);
synchronized (_brokerConfigurations)
{
_brokerConfigurations.put(actualPort, configuration);
@@ -341,7 +349,8 @@ public class QpidBrokerTestCase extends QpidTestCase
protected void setUp() throws Exception
{
super.setUp();
-
+ _taskExecutor = new TaskExecutor();
+ _taskExecutor.start();
if (!_configFile.exists())
{
fail("Unable to test without config file:" + _configFile);
@@ -1192,6 +1201,10 @@ public class QpidBrokerTestCase extends QpidTestCase
{
c.close();
}
+ if(_taskExecutor != null)
+ {
+ _taskExecutor.stop();
+ }
}
/**
diff --git a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
index fd62ce75b9..3feb2eaab9 100644
--- a/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
+++ b/qpid/java/systests/src/main/java/org/apache/qpid/test/utils/TestBrokerConfiguration.java
@@ -37,15 +37,16 @@ import org.apache.qpid.server.model.AccessControlProvider;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.ConfiguredObjectFactory;
+import org.apache.qpid.server.model.ConfiguredObjectFactoryImpl;
import org.apache.qpid.server.model.GroupProvider;
import org.apache.qpid.server.model.Model;
import org.apache.qpid.server.model.Plugin;
import org.apache.qpid.server.model.PreferencesProvider;
import org.apache.qpid.server.model.SystemContextImpl;
import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProvider;
+import org.apache.qpid.server.model.adapter.FileBasedGroupProviderImpl;
import org.apache.qpid.server.security.access.FileAccessControlProviderConstants;
-import org.apache.qpid.server.security.group.FileGroupManagerFactory;
import org.apache.qpid.server.store.ConfiguredObjectRecord;
import org.apache.qpid.server.store.ConfiguredObjectRecordImpl;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
@@ -73,9 +74,9 @@ public class TestBrokerConfiguration
private MemoryConfigurationEntryStore _store;
private boolean _saved;
- public TestBrokerConfiguration(String storeType, String intialStoreLocation)
+ public TestBrokerConfiguration(String storeType, String intialStoreLocation, final TaskExecutor taskExecutor)
{
- _store = new MemoryConfigurationEntryStore(new SystemContextImpl(new TaskExecutor(), new ConfiguredObjectFactory(
+ _store = new MemoryConfigurationEntryStore(new SystemContextImpl(taskExecutor, new ConfiguredObjectFactoryImpl(
Model.getInstance()),
mock(EventLogger.class), mock(LogRecorder.class),
mock(BrokerOptions.class)),
@@ -163,8 +164,8 @@ public class TestBrokerConfiguration
{
Map<String, Object> attributes = new HashMap<String, Object>();
attributes.put(GroupProvider.NAME, ENTRY_NAME_GROUP_FILE);
- attributes.put(GroupProvider.TYPE, FileGroupManagerFactory.GROUP_FILE_PROVIDER_TYPE);
- attributes.put(FileGroupManagerFactory.PATH, groupFilePath);
+ attributes.put(GroupProvider.TYPE, FileBasedGroupProviderImpl.GROUP_FILE_PROVIDER_TYPE);
+ attributes.put(FileBasedGroupProvider.PATH, groupFilePath);
return addObjectConfiguration(GroupProvider.class, attributes);
}