From 5b060554268c763cb883a102b04be21741551161 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Mon, 28 Jan 2008 16:48:00 +0000 Subject: Merged revisions 608477,609961,610475,610479,610806,611146 via svnmerge from https://svn.apache.org/repos/asf/incubator/qpid/branches/M2 ........ r608477 | rgodfrey | 2008-01-03 13:23:04 +0000 (Thu, 03 Jan 2008) | 1 line QPID-499 : Added per-virtual host timed tasks to inspect queues (with no consumers) for expired messages ........ r609961 | ritchiem | 2008-01-08 12:59:01 +0000 (Tue, 08 Jan 2008) | 2 lines QPID-499 : Patch to update the queue size statistics when the Active TTL process runs Removed old single commented out code line from AMQSession. ........ r610475 | ritchiem | 2008-01-09 17:32:43 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-723 Added exec to qpid.start ........ r610479 | ritchiem | 2008-01-09 17:39:54 +0000 (Wed, 09 Jan 2008) | 1 line Qpid-690 : Provide configurable delay between re-connecion attempts. ........ r610806 | ritchiem | 2008-01-10 14:41:37 +0000 (Thu, 10 Jan 2008) | 1 line QPID-690 : Relaxed the timings on failover as Thread.sleep is accurate to 10ms so may finish the sleep 10ms early. Resulting in erratic failures as 9.9s < 10s. ........ r611146 | ritchiem | 2008-01-11 11:33:31 +0000 (Fri, 11 Jan 2008) | 1 line Patch by Aidan Skinner to make third constructor public. This is done so that the BDBMessageStore tests can still run with the addition of the VirtualHost reaper thread. ........ git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@615943 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQQueue.java | 24 +- .../queue/ConcurrentSelectorDeliveryManager.java | 24 + .../apache/qpid/server/queue/DeliveryManager.java | 2 + .../apache/qpid/server/queue/SubscriptionSet.java | 14 +- .../qpid/server/virtualhost/VirtualHost.java | 563 +++++++++++---------- 5 files changed, 360 insertions(+), 267 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 53c36d9718..e1c1de29bd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.configuration.Configured; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.management.Managable; import org.apache.qpid.server.management.ManagedObject; @@ -37,6 +38,8 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; import java.util.List; +import java.util.Timer; +import java.util.TimerTask; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -154,10 +157,7 @@ public class AMQQueue implements Managable, Comparable /** total messages received by the queue since startup. */ public AtomicLong _totalMessagesReceived = new AtomicLong(); - public int compareTo(Object o) - { - return _name.compareTo(((AMQQueue) o).getName()); - } + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -950,4 +950,20 @@ public class AMQQueue implements Managable, Comparable return new QueueEntry(this, amqMessage); } + public int compareTo(Object o) + { + return _name.compareTo(((AMQQueue) o).getName()); + } + + + public void removeExpiredIfNoSubscribers() throws AMQException + { + synchronized(_subscribers.getChangeLock()) + { + if(_subscribers.isEmpty()) + { + _deliveryMgr.removeExpired(); + } + } + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index 3cf2cb9b12..a7cef1b983 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -212,6 +212,30 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager } } + /** + * NOTE : This method should only be called when there are no active subscribers + */ + public void removeExpired() throws AMQException + { + _lock.lock(); + + + for(Iterator iter = _messages.iterator(); iter.hasNext();) + { + QueueEntry entry = iter.next(); + if(entry.expired()) + { + // fixme: Currently we have to update the total byte size here for the data in the queue + _totalMessageSize.addAndGet(-entry.getSize()); + _queue.dequeue(_reapingStoreContext,entry); + iter.remove(); + } + } + + + _lock.unlock(); + } + /** @return the state of the async processor. */ public boolean isProcessingAsync() { diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java index f7f35a9319..1568f58e2e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/DeliveryManager.java @@ -97,4 +97,6 @@ interface DeliveryManager long getOldestMessageArrival(); void subscriberHasPendingResend(boolean hasContent, Subscription subscription, QueueEntry msg); + + void removeExpired() throws AMQException; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b73b8d7e07..b2f8cae8ff 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -37,7 +37,8 @@ class SubscriptionSet implements WeightedSubscriptionManager /** Used to control the round robin delivery of content */ private int _currentSubscriber; - private final Object _subscriptionsChange = new Object(); + + private final Object _changeLock = new Object(); /** Accessor for unit tests. */ @@ -48,7 +49,7 @@ class SubscriptionSet implements WeightedSubscriptionManager public void addSubscriber(Subscription subscription) { - synchronized (_subscriptionsChange) + synchronized (_changeLock) { _subscriptions.add(subscription); } @@ -66,7 +67,7 @@ class SubscriptionSet implements WeightedSubscriptionManager // TODO: possibly need O(1) operation here. Subscription sub = null; - synchronized (_subscriptionsChange) + synchronized (_changeLock) { int subIndex = _subscriptions.indexOf(subscription); @@ -226,4 +227,11 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _subscriptions.size(); } + + + public Object getChangeLock() + { + return _changeLock; + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 8d6a26fdbc..9addf83e01 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -1,260 +1,303 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.virtualhost; - -import javax.management.NotCompliantMBeanException; - -import org.apache.commons.configuration.Configuration; -import org.apache.log4j.Logger; -import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessManagerImpl; -import org.apache.qpid.server.security.access.Accessable; -import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.auth.manager.AuthenticationManager; -import org.apache.qpid.server.configuration.Configurator; -import org.apache.qpid.server.exchange.DefaultExchangeFactory; -import org.apache.qpid.server.exchange.DefaultExchangeRegistry; -import org.apache.qpid.server.exchange.ExchangeFactory; -import org.apache.qpid.server.exchange.ExchangeRegistry; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.server.queue.DefaultQueueRegistry; -import org.apache.qpid.server.queue.QueueRegistry; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.store.MessageStore; - -public class VirtualHost implements Accessable -{ - private static final Logger _logger = Logger.getLogger(VirtualHost.class); - - - private final String _name; - - private QueueRegistry _queueRegistry; - - private ExchangeRegistry _exchangeRegistry; - - private ExchangeFactory _exchangeFactory; - - private MessageStore _messageStore; - - protected VirtualHostMBean _virtualHostMBean; - - private AMQBrokerManagerMBean _brokerMBean; - - private AuthenticationManager _authenticationManager; - - private AccessManager _accessManager; - - - public void setAccessableName(String name) - { - _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" - + name + ") ignored remains :" + getAccessableName()); - } - - public String getAccessableName() - { - return _name; - } - - - /** - * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any - * implementaion of an Exchange MBean should extend this class. - */ - public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost - { - public VirtualHostMBean() throws NotCompliantMBeanException - { - super(ManagedVirtualHost.class, "VirtualHost"); - } - - public String getObjectInstanceName() - { - return _name.toString(); - } - - public String getName() - { - return _name.toString(); - } - - public VirtualHost getVirtualHost() - { - return VirtualHost.this; - } - - - } // End of MBean class - - /** - * Used for testing only - * @param name - * @param store - * @throws Exception - */ - public VirtualHost(String name, MessageStore store) throws Exception - { - this(name, null, store); - } - - /** - * Normal Constructor - * @param name - * @param hostConfig - * @throws Exception - */ - public VirtualHost(String name, Configuration hostConfig) throws Exception - { - this(name, hostConfig, null); - } - - private VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception - { - _name = name; - - _virtualHostMBean = new VirtualHostMBean(); - // This isn't needed to be registered - //_virtualHostMBean.register(); - - _queueRegistry = new DefaultQueueRegistry(this); - _exchangeFactory = new DefaultExchangeFactory(this); - _exchangeFactory.initialise(hostConfig); - _exchangeRegistry = new DefaultExchangeRegistry(this); - - if (store != null) - { - _messageStore = store; - } - else - { - if (hostConfig == null) - { - throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); - } - initialiseMessageStore(hostConfig); - } - - _exchangeRegistry.initialise(); - - _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); - - _accessManager = new AccessManagerImpl(name, hostConfig); - - _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); - _brokerMBean.register(); - } - - private void initialiseMessageStore(Configuration config) throws Exception - { - String messageStoreClass = config.getString("store.class"); - - Class clazz = Class.forName(messageStoreClass); - Object o = clazz.newInstance(); - - if (!(o instanceof MessageStore)) - { - throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + - " does not."); - } - _messageStore = (MessageStore) o; - _messageStore.configure(this, "store", config); - } - - - public T getConfiguredObject(Class instanceType, Configuration config) - { - T instance; - try - { - instance = instanceType.newInstance(); - } - catch (Exception e) - { - _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); - throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); - } - Configurator.configure(instance); - - return instance; - } - - - public String getName() - { - return _name; - } - - public QueueRegistry getQueueRegistry() - { - return _queueRegistry; - } - - public ExchangeRegistry getExchangeRegistry() - { - return _exchangeRegistry; - } - - public ExchangeFactory getExchangeFactory() - { - return _exchangeFactory; - } - - public ApplicationRegistry getApplicationRegistry() - { - throw new UnsupportedOperationException(); - } - - public MessageStore getMessageStore() - { - return _messageStore; - } - - public AuthenticationManager getAuthenticationManager() - { - return _authenticationManager; - } - - public AccessManager getAccessManager() - { - return _accessManager; - } - - public void close() throws Exception - { - if (_messageStore != null) - { - _messageStore.close(); - } - } - - public ManagedObject getBrokerMBean() - { - return _brokerMBean; - } - - public ManagedObject getManagedObject() - { - return _virtualHostMBean; - } -} +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhost; + +import javax.management.NotCompliantMBeanException; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.server.AMQBrokerManagerMBean; +import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; +import org.apache.qpid.server.security.auth.manager.AuthenticationManager; +import org.apache.qpid.server.configuration.Configurator; +import org.apache.qpid.server.exchange.DefaultExchangeFactory; +import org.apache.qpid.server.exchange.DefaultExchangeRegistry; +import org.apache.qpid.server.exchange.ExchangeFactory; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.ManagedObject; +import org.apache.qpid.server.queue.DefaultQueueRegistry; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.AMQException; + +import java.util.Timer; +import java.util.TimerTask; + +public class VirtualHost implements Accessable +{ + private static final Logger _logger = Logger.getLogger(VirtualHost.class); + + + private final String _name; + + private QueueRegistry _queueRegistry; + + private ExchangeRegistry _exchangeRegistry; + + private ExchangeFactory _exchangeFactory; + + private MessageStore _messageStore; + + protected VirtualHostMBean _virtualHostMBean; + + private AMQBrokerManagerMBean _brokerMBean; + + private AuthenticationManager _authenticationManager; + + private AccessManager _accessManager; + + private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true); + + private static final long DEFAULT_HOUSEKEEPING_PERIOD = 30000L; + + public void setAccessableName(String name) + { + _logger.warn("Setting Accessable Name for VirualHost is not allowed. (" + + name + ") ignored remains :" + getAccessableName()); + } + + public String getAccessableName() + { + return _name; + } + + + /** + * Abstract MBean class. This has some of the methods implemented from management intrerface for exchanges. Any + * implementaion of an Exchange MBean should extend this class. + */ + public class VirtualHostMBean extends AMQManagedObject implements ManagedVirtualHost + { + public VirtualHostMBean() throws NotCompliantMBeanException + { + super(ManagedVirtualHost.class, "VirtualHost"); + } + + public String getObjectInstanceName() + { + return _name.toString(); + } + + public String getName() + { + return _name.toString(); + } + + public VirtualHost getVirtualHost() + { + return VirtualHost.this; + } + + + } // End of MBean class + + /** + * Used for testing only + * @param name + * @param store + * @throws Exception + */ + public VirtualHost(String name, MessageStore store) throws Exception + { + this(name, null, store); + } + + /** + * Normal Constructor + * @param name + * @param hostConfig + * @throws Exception + */ + public VirtualHost(String name, Configuration hostConfig) throws Exception + { + this(name, hostConfig, null); + } + + public VirtualHost(String name, Configuration hostConfig, MessageStore store) throws Exception + { + _name = name; + + _virtualHostMBean = new VirtualHostMBean(); + // This isn't needed to be registered + //_virtualHostMBean.register(); + + _queueRegistry = new DefaultQueueRegistry(this); + _exchangeFactory = new DefaultExchangeFactory(this); + _exchangeFactory.initialise(hostConfig); + _exchangeRegistry = new DefaultExchangeRegistry(this); + + if (store != null) + { + _messageStore = store; + } + else + { + if (hostConfig == null) + { + throw new IllegalAccessException("HostConfig and MessageStore cannot be null"); + } + initialiseMessageStore(hostConfig); + } + + _exchangeRegistry.initialise(); + + _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); + + _accessManager = new AccessManagerImpl(name, hostConfig); + + _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); + _brokerMBean.register(); + initialiseHouseKeeping(hostConfig); + } + + private void initialiseHouseKeeping(final Configuration hostConfig) + { + + long period = hostConfig.getLong("housekeeping.expiredMessageCheckPeriod", DEFAULT_HOUSEKEEPING_PERIOD); + + /* add a timer task to iterate over queues, cleaning expired messages from queues with no consumers */ + if(period != 0L) + { + class RemoveExpiredMessagesTask extends TimerTask + { + public void run() + { + for(AMQQueue q : _queueRegistry.getQueues()) + { + + try + { + q.removeExpiredIfNoSubscribers(); + } + catch (AMQException e) + { + _logger.error("Exception in housekeeping for queue: " + q.getName().toString(),e); + throw new RuntimeException(e); + } + } + } + } + + _houseKeepingTimer.scheduleAtFixedRate(new RemoveExpiredMessagesTask(), + period/2, + period); + } + } + + private void initialiseMessageStore(Configuration config) throws Exception + { + String messageStoreClass = config.getString("store.class"); + + Class clazz = Class.forName(messageStoreClass); + Object o = clazz.newInstance(); + + if (!(o instanceof MessageStore)) + { + throw new ClassCastException("Message store class must implement " + MessageStore.class + ". Class " + clazz + + " does not."); + } + _messageStore = (MessageStore) o; + _messageStore.configure(this, "store", config); + } + + + public T getConfiguredObject(Class instanceType, Configuration config) + { + T instance; + try + { + instance = instanceType.newInstance(); + } + catch (Exception e) + { + _logger.error("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor"); + throw new IllegalArgumentException("Unable to instantiate configuration class " + instanceType + " - ensure it has a public default constructor", e); + } + Configurator.configure(instance); + + return instance; + } + + + public String getName() + { + return _name; + } + + public QueueRegistry getQueueRegistry() + { + return _queueRegistry; + } + + public ExchangeRegistry getExchangeRegistry() + { + return _exchangeRegistry; + } + + public ExchangeFactory getExchangeFactory() + { + return _exchangeFactory; + } + + public ApplicationRegistry getApplicationRegistry() + { + throw new UnsupportedOperationException(); + } + + public MessageStore getMessageStore() + { + return _messageStore; + } + + public AuthenticationManager getAuthenticationManager() + { + return _authenticationManager; + } + + public AccessManager getAccessManager() + { + return _accessManager; + } + + public void close() throws Exception + { + if (_messageStore != null) + { + _messageStore.close(); + } + } + + public ManagedObject getBrokerMBean() + { + return _brokerMBean; + } + + public ManagedObject getManagedObject() + { + return _virtualHostMBean; + } +} -- cgit v1.2.1 From cebb9bb85aa61402762e2567be0d6a714b65f8d2 Mon Sep 17 00:00:00 2001 From: Aidan Skinner Date: Fri, 8 Feb 2008 12:52:54 +0000 Subject: QPID-588: change instances of trace() and isTraceEnabled to debug equivalent to support older versions of log4j git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@619868 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 8 ++++---- .../server/handler/BasicConsumeMethodHandler.java | 4 ++-- .../server/handler/BasicRejectMethodHandler.java | 4 ++-- .../queue/ConcurrentSelectorDeliveryManager.java | 24 +++++++++++----------- .../apache/qpid/server/queue/SubscriptionImpl.java | 8 ++++---- 5 files changed, 24 insertions(+), 24 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 4696ec4453..1bf0cd027a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -215,9 +215,9 @@ public class AMQChannel } else { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content header received on channel " + _channelId); + _log.debug(debugIdentity() + "Content header received on channel " + _channelId); } if (ENABLE_JMSXUserID) @@ -252,9 +252,9 @@ public class AMQChannel throw new AMQException("Received content body without previously receiving a JmsPublishBody"); } - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Content body received on channel " + _channelId); + _log.debug(debugIdentity() + "Content body received on channel " + _channelId); } try diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index bb48539f50..ee46b16397 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -77,9 +77,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener currentQueue = _messages.iterator(); @@ -536,9 +536,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager //else the clean up is not required as the message has already been taken for this queue therefore // it was the responsibility of the code that took the message to ensure the _totalMessageSize was updated. - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace("Removed taken message:" + message.debugIdentity()); + _log.debug("Removed taken message:" + message.debugIdentity()); } // try the next message @@ -634,9 +634,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager Queue messageQueue = sub.getNextQueue(_messages); - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + + _log.debug(debugIdentity() + "Async sendNextMessage for sub (" + System.identityHashCode(sub) + ") from queue (" + System.identityHashCode(messageQueue) + ") AMQQueue (" + System.identityHashCode(queue) + ")"); } @@ -662,9 +662,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager // message will be null if we have no messages in the messageQueue. if (entry == null) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); + _log.debug(debugIdentity() + "No messages for Subscriber(" + System.identityHashCode(sub) + ") from queue; (" + System.identityHashCode(messageQueue) + ")"); } return; } @@ -704,9 +704,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager if (messageQueue == sub.getResendQueue()) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "All messages sent from resendQueue for " + sub); + _log.debug(debugIdentity() + "All messages sent from resendQueue for " + sub); } if (messageQueue.isEmpty()) { @@ -918,9 +918,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isTraceEnabled()) + if (_log.isDebugEnabled()) { - _log.trace(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + + _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index e631481cc8..7f0accf052 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -416,9 +416,9 @@ public class SubscriptionImpl implements Subscription } - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); + _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); } return checkFilters(entry); @@ -563,9 +563,9 @@ public class SubscriptionImpl implements Subscription { QueueEntry resent = _resendQueue.poll(); - if (_logger.isTraceEnabled()) + if (_logger.isDebugEnabled()) { - _logger.trace("Removed for resending:" + resent.debugIdentity()); + _logger.debug("Removed for resending:" + resent.debugIdentity()); } resent.release(); -- cgit v1.2.1 From 1df04df23ddda53cda350ddaeec692cb2f7cbed8 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 12 Feb 2008 16:44:59 +0000 Subject: QPID-787 : Allow for quoting of identifiers in selectors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@620858 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/src/main/grammar/SelectorParser.jj | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) (limited to 'java/broker/src') diff --git a/java/broker/src/main/grammar/SelectorParser.jj b/java/broker/src/main/grammar/SelectorParser.jj index adec1b348d..f6a843e080 100644 --- a/java/broker/src/main/grammar/SelectorParser.jj +++ b/java/broker/src/main/grammar/SelectorParser.jj @@ -172,6 +172,7 @@ TOKEN [IGNORE_CASE] : TOKEN [IGNORE_CASE] : { < ID : ["a"-"z", "_", "$"] (["a"-"z","0"-"9","_", "$"])* > + | < QUOTED_ID : "\"" ( ("\"\"") | ~["\""] )* "\"" > } // ---------------------------------------------------------------------------- @@ -589,6 +590,7 @@ String stringLitteral() : PropertyExpression variable() : { Token t; + StringBuffer rc = new StringBuffer(); PropertyExpression left=null; } { @@ -597,6 +599,21 @@ PropertyExpression variable() : { left = new PropertyExpression(t.image); } + | + t = + { + // Decode the sting value. + String image = t.image; + for( int i=1; i < image.length()-1; i++ ) { + char c = image.charAt(i); + if( c == '"' ) + i++; + rc.append(c); + } + return new PropertyExpression(rc.toString()); + } + + ) { return left; -- cgit v1.2.1 From 91dfa2865cb9998a379e099ff58e830b4b1ba8a4 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 13 Feb 2008 18:10:53 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@627552 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 6 ++- .../qpid/server/exchange/DestWildExchange.java | 2 +- .../server/handler/BasicPublishMethodHandler.java | 3 +- .../amqp0_8/ProtocolOutputConverterImpl.java | 18 +++---- .../amqp0_9/ProtocolOutputConverterImpl.java | 63 +++++++++++++--------- .../org/apache/qpid/server/queue/AMQMessage.java | 17 +++++- .../qpid/server/exchange/DestWildExchangeTest.java | 5 ++ .../qpid/server/queue/AMQQueueAlertTest.java | 5 ++ .../qpid/server/queue/AMQQueueMBeanTest.java | 5 ++ 9 files changed, 86 insertions(+), 38 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 1bf0cd027a..10184a79e5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; import org.apache.qpid.server.exchange.MessageRouter; import org.apache.qpid.server.exchange.NoRouteException; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.*; import org.apache.qpid.server.store.MessageStore; @@ -199,11 +200,12 @@ public class AMQChannel _prefetch_HighWaterMark = prefetchCount; } - public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher) throws AMQException + public void setPublishFrame(MessagePublishInfo info, AMQProtocolSession publisher, final Exchange e) throws AMQException { _currentMessage = new AMQMessage(_messageStore.getNewMessageId(), info, _txnContext); _currentMessage.setPublisher(publisher); + _currentMessage.setExchange(e); } public void publishContentHeader(ContentHeaderBody contentHeaderBody, AMQProtocolSession protocolSession) @@ -285,7 +287,7 @@ public class AMQChannel { try { - _exchanges.routeContent(_currentMessage); + _currentMessage.route(); } catch (NoRouteException e) { diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 75be86a387..19172b98f3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -239,7 +239,7 @@ public class DestWildExchange extends AbstractExchange { MessagePublishInfo info = payload.getMessagePublishInfo(); - final AMQShortString routingKey = normalize(info.getRoutingKey()); + final AMQShortString routingKey = info.getRoutingKey(); List queues = getMatchedQueues(routingKey); // if we have no registered queues we have nothing to do diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index 66afc61751..687ec33ba0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -91,7 +91,8 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener 0; } + public void setExchange(final Exchange exchange) + { + _exchange = exchange; + } + + public void route() throws AMQException + { + _exchange.route(this); + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index 8e5879a51e..7e2d56b460 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -589,6 +589,11 @@ public class DestWildExchangeTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + + } + public boolean isImmediate() { return false; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index 81b0ae2213..fbd9e65480 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -242,6 +242,11 @@ public class AMQQueueAlertTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isImmediate() { return immediate; diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index d86c90bdae..e72e1bf1f0 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -234,6 +234,11 @@ public class AMQQueueMBeanTest extends TestCase return null; } + public void setExchange(AMQShortString exchange) + { + //To change body of implemented methods use File | Settings | File Templates. + } + public boolean isImmediate() { return immediate; -- cgit v1.2.1 From bc9cf0d495598b359fc1a67d03f4636ca610c6a9 Mon Sep 17 00:00:00 2001 From: Rupert Smith Date: Wed, 20 Feb 2008 16:04:25 +0000 Subject: QPID-800 : junit toolkit sources added. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629518 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/plugins/PluginManager.java | 4 +--- .../main/java/org/apache/qpid/server/txn/TransactionalContext.java | 1 - 2 files changed, 1 insertion(+), 4 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java index be22a90d0b..9191ecf6ed 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/plugins/PluginManager.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.plugins; import java.io.File; import java.util.ArrayList; -import java.util.Collection; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -30,7 +29,6 @@ import org.apache.felix.framework.Felix; import org.apache.felix.framework.cache.BundleCache; import org.apache.felix.framework.util.FelixConstants; import org.apache.felix.framework.util.StringMap; -import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.exchange.ExchangeType; import org.osgi.framework.BundleActivator; import org.osgi.framework.BundleException; @@ -72,7 +70,7 @@ public class PluginManager "org.apache.qpid.server.queue; version=0.2.1," + "javax.management.openmbean; version=1.0.0,"+ "javax.management; version=1.0.0,"+ - "uk.co.thebadgerset.junit.extensions.util; version=0.6.1," + "org.apache.qpid.junit.extensions.util; version=0.6.1," ); if (plugindir == null) diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java index b4c66aa24d..6016ecc1a5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java @@ -24,7 +24,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQMessage; -import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.StoreContext; -- cgit v1.2.1 From 3047c0ec2d581f4b51c77fec84fbf0bec8599573 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 21 Feb 2008 10:09:03 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629731 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 27 +- .../src/main/java/org/apache/qpid/server/Main.java | 3 +- .../qpid/server/exchange/DestNameExchange.java | 7 +- .../qpid/server/exchange/DestWildExchange.java | 343 ++++++++++++++------- .../qpid/server/exchange/FanoutExchange.java | 7 +- .../qpid/server/handler/ChannelOpenHandler.java | 5 +- .../amqp0_8/ProtocolOutputConverterImpl.java | 15 +- .../amqp0_9/ProtocolOutputConverterImpl.java | 153 +++++++-- .../server/protocol/AMQMinaProtocolSession.java | 36 +-- .../server/protocol/AMQPFastProtocolHandler.java | 3 +- .../org/apache/qpid/server/queue/AMQMessage.java | 11 +- .../org/apache/qpid/server/queue/AMQQueue.java | 75 ++++- .../apache/qpid/server/queue/AMQQueueMBean.java | 25 +- .../queue/ConcurrentSelectorDeliveryManager.java | 5 +- .../qpid/server/queue/NotificationCheck.java | 4 +- .../apache/qpid/server/queue/SubscriptionImpl.java | 113 +++---- .../apache/qpid/server/queue/SubscriptionSet.java | 81 +++-- .../qpid/server/queue/TransientMessageData.java | 10 +- .../qpid/server/txn/NonTransactionalContext.java | 4 +- .../qpid/server/virtualhost/VirtualHost.java | 3 +- .../qpid/server/queue/AMQQueueAlertTest.java | 2 +- .../qpid/server/queue/AMQQueueMBeanTest.java | 2 +- 22 files changed, 624 insertions(+), 310 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 10184a79e5..3cb50d1d12 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -75,7 +75,7 @@ public class AMQChannel * The delivery tag is unique per channel. This is pre-incremented before putting into the deliver frame so that * value of this represents the last tag sent out */ - private AtomicLong _deliveryTag = new AtomicLong(0); + private long _deliveryTag = 0; /** A channel has a default queue (the last declared) that is used when no queue name is explictily set */ private AMQQueue _defaultQueue; @@ -99,8 +99,6 @@ public class AMQChannel private final AtomicBoolean _suspended = new AtomicBoolean(false); - private final MessageRouter _exchanges; - private TransactionalContext _txnContext, _nonTransactedContext; /** @@ -124,7 +122,7 @@ public class AMQChannel public boolean ENABLE_JMSXUserID; - public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore, MessageRouter exchanges) + public AMQChannel(AMQProtocolSession session, int channelId, MessageStore messageStore) throws AMQException { //Set values from configuration @@ -136,7 +134,7 @@ public class AMQChannel _prefetch_HighWaterMark = DEFAULT_PREFETCH; _prefetch_LowWaterMark = _prefetch_HighWaterMark / 2; _messageStore = messageStore; - _exchanges = exchanges; + // by default the session is non-transactional _txnContext = new NonTransactionalContext(_messageStore, _storeContext, this, _returnMessages, _browsedAcks); } @@ -297,7 +295,7 @@ public class AMQChannel public long getNextDeliveryTag() { - return _deliveryTag.incrementAndGet(); + return ++_deliveryTag; } public int getNextConsumerTag() @@ -969,16 +967,19 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - for (RequiredDeliveryException bouncedMessage : _returnMessages) + if(!_returnMessages.isEmpty()) { - AMQMessage message = bouncedMessage.getAMQMessage(); - session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), - new AMQShortString(bouncedMessage.getMessage())); + for (RequiredDeliveryException bouncedMessage : _returnMessages) + { + AMQMessage message = bouncedMessage.getAMQMessage(); + session.getProtocolOutputConverter().writeReturn(message, _channelId, bouncedMessage.getReplyCode().getCode(), + new AMQShortString(bouncedMessage.getMessage())); - message.decrementReference(_storeContext); - } + message.decrementReference(_storeContext); + } - _returnMessages.clear(); + _returnMessages.clear(); + } } public boolean wouldSuspend(AMQMessage msg) diff --git a/java/broker/src/main/java/org/apache/qpid/server/Main.java b/java/broker/src/main/java/org/apache/qpid/server/Main.java index ab9f40b31d..d8a8cfb6d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/Main.java +++ b/java/broker/src/main/java/org/apache/qpid/server/Main.java @@ -35,6 +35,7 @@ import org.apache.log4j.xml.DOMConfigurator; import org.apache.mina.common.ByteBuffer; import org.apache.mina.common.IoAcceptor; import org.apache.mina.common.SimpleByteBufferAllocator; +import org.apache.mina.common.FixedSizeByteBufferAllocator; import org.apache.mina.transport.socket.nio.SocketAcceptorConfig; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.qpid.AMQException; @@ -275,7 +276,7 @@ public class Main // once more testing of the performance of the simple allocator has been done if (!connectorConfig.enablePooledAllocator) { - ByteBuffer.setAllocator(new SimpleByteBufferAllocator()); + ByteBuffer.setAllocator(new FixedSizeByteBufferAllocator()); } int port = connectorConfig.port; diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java index b6b6ee39ce..12347c0278 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestNameExchange.java @@ -212,10 +212,9 @@ public class DestNameExchange extends AbstractExchange _logger.debug("Publishing message to queue " + queues); } - for (AMQQueue q : queues) - { - payload.enqueue(q); - } + payload.enqueue(queues); + + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index 19172b98f3..dbe7a8938a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -26,6 +26,7 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.AMQShortStringTokenizer; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.management.MBeanConstructor; import org.apache.qpid.server.management.MBeanDescription; @@ -40,11 +41,7 @@ import javax.management.openmbean.CompositeDataSupport; import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; -import java.util.ArrayList; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; -import java.util.StringTokenizer; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; @@ -84,12 +81,21 @@ public class DestWildExchange extends AbstractExchange private static final Logger _logger = Logger.getLogger(DestWildExchange.class); - private ConcurrentHashMap> _routingKey2queues = + private final ConcurrentHashMap> _bindingKey2queues = + new ConcurrentHashMap>(); + private final ConcurrentHashMap> _simpleBindingKey2queues = + new ConcurrentHashMap>(); + private final ConcurrentHashMap> _wildCardBindingKey2queues = new ConcurrentHashMap>(); // private ConcurrentHashMap _routingKey2queue = new ConcurrentHashMap(); - private static final String TOPIC_SEPARATOR = "."; - private static final String AMQP_STAR = "*"; - private static final String AMQP_HASH = "#"; + private static final byte TOPIC_SEPARATOR = (byte)'.'; + private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); + private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); + private static final AMQShortString AMQP_HASH_TOKEN = new AMQShortString("#"); + private ConcurrentHashMap _bindingKey2Tokenized = + new ConcurrentHashMap(); + private static final byte HASH_BYTE = (byte)'#'; + private static final byte STAR_BYTE = (byte)'*'; /** DestWildExchangeMBean class implements the management interface for the Topic exchanges. */ @MBeanDescription("Management Bean for Topic Exchange") @@ -107,7 +113,7 @@ public class DestWildExchange extends AbstractExchange public TabularData bindings() throws OpenDataException { _bindingList = new TabularDataSupport(_bindinglistDataType); - for (Map.Entry> entry : _routingKey2queues.entrySet()) + for (Map.Entry> entry : _bindingKey2queues.entrySet()) { AMQShortString key = entry.getKey(); List queueList = new ArrayList(); @@ -156,27 +162,75 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); + _logger.debug("Registering queue " + queue.getName() + " with routing key " + rKey); - _logger.debug("Registering queue " + queue.getName() + " with routing key " + routingKey); // we need to use putIfAbsent, which is an atomic operation, to avoid a race condition - List queueList = _routingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList()); + List queueList = _bindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList()); + + + + + + + // if we got null back, no previous value was associated with the specified routing key hence // we need to read back the new value just put into the map if (queueList == null) { - queueList = _routingKey2queues.get(routingKey); + queueList = _bindingKey2queues.get(rKey); } + + if (!queueList.contains(queue)) { queueList.add(queue); + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString routingKey = normalize(rKey); + List queueList2 = _wildCardBindingKey2queues.putIfAbsent(routingKey, new CopyOnWriteArrayList()); + + if(queueList2 == null) + { + queueList2 = _wildCardBindingKey2queues.get(routingKey); + AMQShortStringTokenizer keyTok = routingKey.tokenize(TOPIC_SEPARATOR); + + ArrayList keyTokList = new ArrayList(keyTok.countTokens()); + + while (keyTok.hasMoreTokens()) + { + keyTokList.add(keyTok.nextToken()); + } + + _bindingKey2Tokenized.put(routingKey, keyTokList.toArray(new AMQShortString[keyTokList.size()])); + } + queueList2.add(queue); + + } + else + { + List queueList2 = _simpleBindingKey2queues.putIfAbsent(rKey, new CopyOnWriteArrayList()); + if(queueList2 == null) + { + queueList2 = _simpleBindingKey2queues.get(rKey); + } + queueList2.add(queue); + + } + + + + } else if (_logger.isDebugEnabled()) { - _logger.debug("Queue " + queue + " is already registered with routing key " + routingKey); + _logger.debug("Queue " + queue + " is already registered with routing key " + rKey); } + + } private AMQShortString normalize(AMQShortString routingKey) @@ -186,53 +240,58 @@ public class DestWildExchange extends AbstractExchange routingKey = AMQShortString.EMPTY_STRING; } - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - List _subscription = new ArrayList(); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + List subscriptionList = new ArrayList(); while (routingTokens.hasMoreTokens()) { - _subscription.add(routingTokens.nextToken()); + subscriptionList.add(routingTokens.nextToken()); } - int size = _subscription.size(); + int size = subscriptionList.size(); for (int index = 0; index < size; index++) { // if there are more levels if ((index + 1) < size) { - if (_subscription.get(index).equals(AMQP_HASH)) + if (subscriptionList.get(index).equals(AMQP_HASH_TOKEN)) { - if (_subscription.get(index + 1).equals(AMQP_HASH)) + if (subscriptionList.get(index + 1).equals(AMQP_HASH_TOKEN)) { // we don't need #.# delete this one - _subscription.remove(index); + subscriptionList.remove(index); size--; // redo this normalisation index--; } - if (_subscription.get(index + 1).equals(AMQP_STAR)) + if (subscriptionList.get(index + 1).equals(AMQP_STAR_TOKEN)) { // we don't want #.* swap to *.# // remove it and put it in at index + 1 - _subscription.add(index + 1, _subscription.remove(index)); + subscriptionList.add(index + 1, subscriptionList.remove(index)); } } } // if we have more levels } - StringBuilder sb = new StringBuilder(); - for (String s : _subscription) + + AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); +/* + StringBuilder sb = new StringBuilder(); + for (AMQShortString s : subscriptionList) { sb.append(s); sb.append(TOPIC_SEPARATOR); } sb.deleteCharAt(sb.length() - 1); +*/ - return new AMQShortString(sb.toString()); + return normalizedString; } public void route(AMQMessage payload) throws AMQException @@ -254,19 +313,14 @@ public class DestWildExchange extends AbstractExchange else { _logger.warn("No queues found for routing key " + routingKey); - _logger.warn("Routing map contains: " + _routingKey2queues); + _logger.warn("Routing map contains: " + _bindingKey2queues); return; } } - for (AMQQueue q : queues) - { - // TODO: modify code generator to add clone() method then clone the deliver body - // without this addition we have a race condition - we will be modifying the body - // before the encoder has encoded the body for delivery - payload.enqueue(q); - } + payload.enqueue(queues); + } public boolean isBound(AMQShortString routingKey, FieldTable arguments, AMQQueue queue) @@ -276,21 +330,21 @@ public class DestWildExchange extends AbstractExchange public boolean isBound(AMQShortString routingKey, AMQQueue queue) { - List queues = _routingKey2queues.get(normalize(routingKey)); + List queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && queues.contains(queue); } public boolean isBound(AMQShortString routingKey) { - List queues = _routingKey2queues.get(normalize(routingKey)); + List queues = _bindingKey2queues.get(normalize(routingKey)); return (queues != null) && !queues.isEmpty(); } public boolean isBound(AMQQueue queue) { - for (List queues : _routingKey2queues.values()) + for (List queues : _bindingKey2queues.values()) { if (queues.contains(queue)) { @@ -303,7 +357,7 @@ public class DestWildExchange extends AbstractExchange public boolean hasBindings() { - return !_routingKey2queues.isEmpty(); + return !_bindingKey2queues.isEmpty(); } public synchronized void deregisterQueue(AMQShortString rKey, AMQQueue queue, FieldTable args) throws AMQException @@ -311,13 +365,11 @@ public class DestWildExchange extends AbstractExchange assert queue != null; assert rKey != null; - AMQShortString routingKey = normalize(rKey); - - List queues = _routingKey2queues.get(routingKey); + List queues = _bindingKey2queues.get(rKey); if (queues == null) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey + ". No queue was registered with that _routing key"); + + " with routing key " + rKey + ". No queue was registered with that _routing key"); } @@ -325,12 +377,39 @@ public class DestWildExchange extends AbstractExchange if (!removedQ) { throw new AMQException(AMQConstant.NOT_FOUND, "Queue " + queue + " was not registered with exchange " + this.getName() - + " with routing key " + routingKey); + + " with routing key " + rKey); + } + + + if(rKey.contains(HASH_BYTE) || rKey.contains(STAR_BYTE)) + { + AMQShortString bindingKey = normalize(rKey); + List queues2 = _wildCardBindingKey2queues.get(bindingKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _wildCardBindingKey2queues.remove(bindingKey); + _bindingKey2Tokenized.remove(bindingKey); + } + } + else + { + List queues2 = _simpleBindingKey2queues.get(rKey); + queues2.remove(queue); + if(queues2.isEmpty()) + { + _simpleBindingKey2queues.remove(rKey); + } + + } + + + if (queues.isEmpty()) { - _routingKey2queues.remove(routingKey); + _bindingKey2queues.remove(rKey); } } @@ -349,117 +428,167 @@ public class DestWildExchange extends AbstractExchange public Map> getBindings() { - return _routingKey2queues; + return _bindingKey2queues; } private List getMatchedQueues(AMQShortString routingKey) { - List list = new LinkedList(); - StringTokenizer routingTokens = new StringTokenizer(routingKey.toString(), TOPIC_SEPARATOR); - ArrayList routingkeyList = new ArrayList(); + List list = null; - while (routingTokens.hasMoreTokens()) + if(!_wildCardBindingKey2queues.isEmpty()) { - String next = routingTokens.nextToken(); - if (next.equals(AMQP_HASH) && routingkeyList.get(routingkeyList.size() - 1).equals(AMQP_HASH)) - { - continue; - } - routingkeyList.add(next); - } - for (AMQShortString queue : _routingKey2queues.keySet()) - { - StringTokenizer queTok = new StringTokenizer(queue.toString(), TOPIC_SEPARATOR); + AMQShortStringTokenizer routingTokens = routingKey.tokenize(TOPIC_SEPARATOR); + + final int routingTokensCount = routingTokens.countTokens(); - ArrayList queueList = new ArrayList(); - while (queTok.hasMoreTokens()) + AMQShortString[] routingkeyTokens = new AMQShortString[routingTokensCount]; + + if(routingTokensCount == 1) { - queueList.add(queTok.nextToken()); + routingkeyTokens[0] =routingKey; } + else + { - int depth = 0; - boolean matching = true; - boolean done = false; - int routingskip = 0; - int queueskip = 0; - while (matching && !done) - { - if ((queueList.size() == (depth + queueskip)) || (routingkeyList.size() == (depth + routingskip))) + int token = 0; + while (routingTokens.hasMoreTokens()) { - done = true; - // if it was the routing key that ran out of digits - if (routingkeyList.size() == (depth + routingskip)) - { - if (queueList.size() > (depth + queueskip)) - { // a hash and it is the last entry - matching = - queueList.get(depth + queueskip).equals(AMQP_HASH) - && (queueList.size() == (depth + queueskip + 1)); - } - } - else if (routingkeyList.size() > (depth + routingskip)) + AMQShortString next = routingTokens.nextToken(); + /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH)) { - // There is still more routing key to check - matching = false; + continue; } + */ - continue; + routingkeyTokens[token++] = next; } + } + for (AMQShortString bindingKey : _wildCardBindingKey2queues.keySet()) + { + + AMQShortString[] bindingKeyTokens = _bindingKey2Tokenized.get(bindingKey); + + + boolean matching = true; + boolean done = false; - // if the values on the two topics don't match - if (!queueList.get(depth + queueskip).equals(routingkeyList.get(depth + routingskip))) + int depthPlusRoutingSkip = 0; + int depthPlusQueueSkip = 0; + + final int bindingKeyTokensCount = bindingKeyTokens.length; + + while (matching && !done) { - if (queueList.get(depth + queueskip).equals(AMQP_STAR)) + + if ((bindingKeyTokensCount == depthPlusQueueSkip) || (routingTokensCount == depthPlusRoutingSkip)) { - depth++; + done = true; + + // if it was the routing key that ran out of digits + if (routingTokensCount == depthPlusRoutingSkip) + { + if (bindingKeyTokensCount > depthPlusQueueSkip) + { // a hash and it is the last entry + matching = + bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN) + && (bindingKeyTokensCount == (depthPlusQueueSkip + 1)); + } + } + else if (routingTokensCount > depthPlusRoutingSkip) + { + // There is still more routing key to check + matching = false; + } continue; } - else if (queueList.get(depth + queueskip).equals(AMQP_HASH)) + + // if the values on the two topics don't match + if (!bindingKeyTokens[depthPlusQueueSkip].equals(routingkeyTokens[depthPlusRoutingSkip])) { - // Is this a # at the end - if (queueList.size() == (depth + queueskip + 1)) + if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_STAR_TOKEN)) { - done = true; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; continue; } - - // otherwise # in the middle - while (routingkeyList.size() > (depth + routingskip)) + else if (bindingKeyTokens[depthPlusQueueSkip].equals(AMQP_HASH_TOKEN)) { - if (routingkeyList.get(depth + routingskip).equals(queueList.get(depth + queueskip + 1))) + // Is this a # at the end + if (bindingKeyTokensCount == (depthPlusQueueSkip + 1)) + { + done = true; + + continue; + } + + // otherwise # in the middle + while (routingTokensCount > depthPlusRoutingSkip) { - queueskip++; - depth++; + if (routingkeyTokens[depthPlusRoutingSkip].equals(bindingKeyTokens[depthPlusQueueSkip + 1])) + { + depthPlusQueueSkip += 2; + depthPlusRoutingSkip++; + + break; + } - break; + depthPlusRoutingSkip++; } - routingskip++; + continue; } - continue; + matching = false; } - matching = false; + depthPlusQueueSkip++; + depthPlusRoutingSkip++; } - depth++; + if (matching) + { + if(list == null) + { + list = new ArrayList(_wildCardBindingKey2queues.get(bindingKey)); + } + else + { + list.addAll(_wildCardBindingKey2queues.get(bindingKey)); + } + } } - if (matching) + } + if(!_simpleBindingKey2queues.isEmpty()) + { + List queues = _simpleBindingKey2queues.get(routingKey); + if(list == null) + { + if(queues == null) + { + list = Collections.EMPTY_LIST; + } + else + { + list = new ArrayList(queues); + } + } + else if(queues != null) { - list.addAll(_routingKey2queues.get(queue)); + list.addAll(queues); } + } return list; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index 57ae2bb6d4..e7c887f306 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -42,6 +42,7 @@ import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; import java.util.List; import java.util.Map; +import java.util.ArrayList; import java.util.concurrent.CopyOnWriteArraySet; public class FanoutExchange extends AbstractExchange @@ -205,10 +206,8 @@ public class FanoutExchange extends AbstractExchange _logger.debug("Publishing message to queue " + _queues); } - for (AMQQueue q : _queues) - { - payload.enqueue(q); - } + payload.enqueue(new ArrayList(_queues)); + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java index 3f604480b9..054674aed4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java @@ -29,7 +29,6 @@ import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -55,8 +54,8 @@ public class ChannelOpenHandler implements StateAwareMethodListener bodyFrameIterator = message.getBodyFrameIterator(getProtocolSession(), channelId); // @@ -236,14 +274,13 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter if (bodyFrameIterator.hasNext()) { AMQDataBlock firstContentBody = bodyFrameIterator.next(); - AMQDataBlock[] headerAndFirstContent = new AMQDataBlock[]{contentHeader, firstContentBody}; - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, headerAndFirstContent); + AMQDataBlock[] blocks = new AMQDataBlock[]{returnFrame, contentHeader, firstContentBody}; + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(blocks); writeFrame(compositeBlock); } else { - CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(returnFrame, - new AMQDataBlock[]{contentHeader}); + CompositeAMQDataBlock compositeBlock = new CompositeAMQDataBlock(new AMQDataBlock[]{returnFrame, contentHeader}); writeFrame(compositeBlock); } @@ -272,4 +309,64 @@ public class ProtocolOutputConverterImpl implements ProtocolOutputConverter writeFrame(basicCancelOkBody.generateFrame(channelId)); } + + + public static final class CompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final AMQBody _contentBody; + private final int _channel; + + + public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + _contentBody = contentBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize(); + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody); + } + } + + public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock + { + public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead(); + + private final AMQBody _methodBody; + private final AMQBody _headerBody; + private final int _channel; + + + public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody) + { + _channel = channel; + _methodBody = methodBody; + _headerBody = headerBody; + + } + + public long getSize() + { + return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ; + } + + public void writePayload(ByteBuffer buffer) + { + AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody); + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 0fe6d3636e..143ee5fa40 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -208,27 +208,10 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _logger.debug("Frame Received: " + frame); } + + + body.handle(channelId, this); - if (body instanceof AMQMethodBody) - { - methodFrameReceived(channelId, (AMQMethodBody) body); - } - else if (body instanceof ContentHeaderBody) - { - contentHeaderReceived(channelId, (ContentHeaderBody) body); - } - else if (body instanceof ContentBody) - { - contentBodyReceived(channelId, (ContentBody) body); - } - else if (body instanceof HeartbeatBody) - { - // NO OP - } - else - { - _logger.warn("Unrecognised frame " + frame.getClass().getName()); - } } private void protocolInitiationReceived(ProtocolInitiation pi) @@ -271,7 +254,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void methodFrameReceived(int channelId, AMQMethodBody methodBody) + public void methodFrameReceived(int channelId, AMQMethodBody methodBody) { final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); @@ -365,7 +348,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - _stateManager.error(e); + for (AMQMethodListener listener : _frameListeners) { listener.error(e); @@ -375,7 +358,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } } - private void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException + public void contentHeaderReceived(int channelId, ContentHeaderBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); @@ -384,13 +367,18 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } - private void contentBodyReceived(int channelId, ContentBody body) throws AMQException + public void contentBodyReceived(int channelId, ContentBody body) throws AMQException { AMQChannel channel = getAndAssertChannel(channelId); channel.publishContentBody(body, this); } + public void heartbeatBodyReceived(int channelId, HeartbeatBody body) + { + // NO - OP + } + /** * Convenience method that writes a frame to the protocol session. Equivalent to calling * getProtocolSession().write(). diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index 543e043bed..db5d882f51 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -30,6 +30,7 @@ import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; import org.apache.mina.filter.codec.ProtocolCodecFilter; +import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; @@ -82,7 +83,7 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter createSession(protocolSession, _applicationRegistry, codecFactory); _logger.info("Protocol session created for:" + protocolSession.getRemoteAddress()); - final ProtocolCodecFilter pcf = new ProtocolCodecFilter(codecFactory); + final QpidProtocolCodecFilter pcf = new QpidProtocolCodecFilter(codecFactory); ConnectorConfiguration connectorConfig = ApplicationRegistry.getInstance(). getConfiguredObject(ConnectorConfiguration.class); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 80158779b2..5e79ab46b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -83,7 +83,7 @@ public class AMQMessage private long _expiration; - private final int hashcode = System.identityHashCode(this); + private Exchange _exchange; private static final boolean SYNCED_CLOCKS = @@ -92,7 +92,7 @@ public class AMQMessage public String debugIdentity() { - return "(HC:" + hashcode + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; + return "(HC:" + System.identityHashCode(this) + " ID:" + _messageId + " Ref:" + _referenceCount.get() + ")"; } public void setExpiration() @@ -141,6 +141,11 @@ public class AMQMessage _exchange.route(this); } + public void enqueue(final List queues) + { + _transientMessageData.setDestinationQueues(queues); + } + /** * Used to iterate through all the body frames associated with this message. Will not keep all the data in memory * therefore is memory-efficient. @@ -664,7 +669,7 @@ public class AMQMessage } finally { - destinationQueues.clear(); + // Remove refence for routing process . Reference count should now == delivered queue count decrementReference(storeContext); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index e1c1de29bd..4a0121700c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -37,9 +37,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; import javax.management.JMException; import java.text.MessageFormat; -import java.util.List; -import java.util.Timer; -import java.util.TimerTask; +import java.util.*; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.Executor; import java.util.concurrent.atomic.AtomicBoolean; @@ -158,6 +156,8 @@ public class AMQQueue implements Managable, Comparable public AtomicLong _totalMessagesReceived = new AtomicLong(); + private final Set _notificationChecks = EnumSet.noneOf(NotificationCheck.class); + public AMQQueue(AMQShortString name, boolean durable, AMQShortString owner, boolean autoDelete, VirtualHost virtualHost) throws AMQException @@ -200,6 +200,13 @@ public class AMQQueue implements Managable, Comparable _subscribers = subscribers; _subscriptionFactory = subscriptionFactory; _deliveryMgr = new ConcurrentSelectorDeliveryManager(_subscribers, this); + + // This ensure that the notification checks for the configured alerts are created. + setMaximumMessageAge(_maximumMessageAge); + setMaximumMessageCount(_maximumMessageCount); + setMaximumMessageSize(_maximumMessageSize); + setMaximumQueueDepth(_maximumQueueDepth); + } private AMQQueueMBean createMBean() throws AMQException @@ -214,7 +221,7 @@ public class AMQQueue implements Managable, Comparable } } - public AMQShortString getName() + public final AMQShortString getName() { return _name; } @@ -540,9 +547,17 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageSize; } - public void setMaximumMessageSize(long value) + public void setMaximumMessageSize(final long maximumMessageSize) { - _maximumMessageSize = value; + _maximumMessageSize = maximumMessageSize; + if(maximumMessageSize == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_SIZE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_SIZE_ALERT); + } } public int getConsumerCount() @@ -565,9 +580,20 @@ public class AMQQueue implements Managable, Comparable return _maximumMessageCount; } - public void setMaximumMessageCount(long value) + public void setMaximumMessageCount(final long maximumMessageCount) { - _maximumMessageCount = value; + _maximumMessageCount = maximumMessageCount; + if(maximumMessageCount == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_COUNT_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_COUNT_ALERT); + } + + + } public long getMaximumQueueDepth() @@ -576,9 +602,18 @@ public class AMQQueue implements Managable, Comparable } // Sets the queue depth, the max queue size - public void setMaximumQueueDepth(long value) + public void setMaximumQueueDepth(final long maximumQueueDepth) { - _maximumQueueDepth = value; + _maximumQueueDepth = maximumQueueDepth; + if(maximumQueueDepth == 0L) + { + _notificationChecks.remove(NotificationCheck.QUEUE_DEPTH_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.QUEUE_DEPTH_ALERT); + } + } public long getOldestMessageArrivalTime() @@ -661,6 +696,10 @@ public class AMQQueue implements Managable, Comparable } _subscribers.addSubscriber(subscription); + if(exclusive) + { + _subscribers.setExclusive(true); + } } private boolean isExclusive() @@ -692,6 +731,7 @@ public class AMQQueue implements Managable, Comparable ps, channel, consumerTag, this)); } + _subscribers.setExclusive(false); Subscription removedSubscription; if ((removedSubscription = _subscribers.removeSubscriber(_subscriptionFactory.createSubscription(channel, ps, consumerTag))) @@ -805,7 +845,7 @@ public class AMQQueue implements Managable, Comparable public void process(StoreContext storeContext, QueueEntry entry, boolean deliverFirst) throws AMQException { AMQMessage msg = entry.getMessage(); - _deliveryMgr.deliver(storeContext, getName(), entry, deliverFirst); + _deliveryMgr.deliver(storeContext, _name, entry, deliverFirst); try { msg.checkDeliveredToConsumer(); @@ -938,6 +978,14 @@ public class AMQQueue implements Managable, Comparable public void setMaximumMessageAge(long maximumMessageAge) { _maximumMessageAge = maximumMessageAge; + if(maximumMessageAge == 0L) + { + _notificationChecks.remove(NotificationCheck.MESSAGE_AGE_ALERT); + } + else + { + _notificationChecks.add(NotificationCheck.MESSAGE_AGE_ALERT); + } } public void subscriberHasPendingResend(boolean hasContent, SubscriptionImpl subscription, QueueEntry entry) @@ -966,4 +1014,9 @@ public class AMQQueue implements Managable, Comparable } } } + + public final Set getNotificationChecks() + { + return _notificationChecks; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 9e32de3f76..348a136f9d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -54,10 +54,7 @@ import javax.management.openmbean.TabularDataSupport; import javax.management.openmbean.TabularType; import java.text.SimpleDateFormat; -import java.util.ArrayList; -import java.util.Date; -import java.util.Iterator; -import java.util.List; +import java.util.*; /** * AMQQueueMBean is the management bean for an {@link AMQQueue}. @@ -97,6 +94,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que private final long[] _lastNotificationTimes = new long[NotificationCheck.values().length]; private Notification _lastNotification = null; + + + @MBeanConstructor("Creates an MBean exposing an AMQQueue") public AMQQueueMBean(AMQQueue queue) throws JMException { @@ -249,16 +249,21 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que public void checkForNotification(AMQMessage msg) throws AMQException, JMException { - final long currentTime = System.currentTimeMillis(); - final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + final Set notificationChecks = _queue.getNotificationChecks(); - for (NotificationCheck check : NotificationCheck.values()) + if(!notificationChecks.isEmpty()) { - if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) + final long currentTime = System.currentTimeMillis(); + final long thresholdTime = currentTime - _queue.getMinimumAlertRepeatGap(); + + for (NotificationCheck check : notificationChecks) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - _lastNotificationTimes[check.ordinal()] = currentTime; + if (check.notifyIfNecessary(msg, _queue, this)) + { + _lastNotificationTimes[check.ordinal()] = currentTime; + } } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index e3c8d3f17a..a61d41e33b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -363,8 +363,9 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager protocolSession.getProtocolOutputConverter().writeGetOk(entry.getMessage(), channel.getChannelId(), deliveryTag, _queue.getMessageCount()); - _totalMessageSize.addAndGet(-entry.getSize()); + } + _totalMessageSize.addAndGet(-entry.getSize()); if (!acks) { @@ -918,7 +919,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { if (!s.isSuspended()) { - if (_log.isDebugEnabled()) + if (debugEnabled) { _log.debug(debugIdentity() + "Delivering Message:" + entry.getMessage().debugIdentity() + " to(" + System.identityHashCode(s) + ") :" + s); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index 6b3d65661f..6f9efd3200 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -29,9 +29,9 @@ public enum NotificationCheck { boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener) { - int msgCount = queue.getMessageCount(); + int msgCount; final long maximumMessageCount = queue.getMaximumMessageCount(); - if (maximumMessageCount!= 0 && msgCount >= maximumMessageCount) + if (maximumMessageCount!= 0 && (msgCount = queue.getMessageCount()) >= maximumMessageCount) { listener.notifyClients(this, queue, msgCount + ": Maximum count on queue threshold ("+ maximumMessageCount +") breached."); return true; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 7f0accf052..6e68b5637e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -292,14 +292,17 @@ public class SubscriptionImpl implements Subscription queue.dequeue(storeContext, entry); } +/* + if (_sendLock.get()) + { + _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); + } +*/ + synchronized (channel) { long deliveryTag = channel.getNextDeliveryTag(); - if (_sendLock.get()) - { - _logger.error("Sending " + entry + " when subscriber(" + this + ") is closed!"); - } if (_acks) { @@ -308,10 +311,11 @@ public class SubscriptionImpl implements Subscription protocolSession.getProtocolOutputConverter().writeDeliver(entry.getMessage(), channel.getChannelId(), deliveryTag, consumerTag); - if (!_acks) - { - entry.getMessage().decrementReference(storeContext); - } + + } + if (!_acks) + { + entry.getMessage().decrementReference(storeContext); } } finally @@ -367,59 +371,60 @@ public class SubscriptionImpl implements Subscription // return false; } - final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + //todo - client id should be recoreded and this test removed but handled below - if (_noLocal && publisher != null) + if (_noLocal) { - // We don't want local messages so check to see if message is one we sent - Object localInstance; - Object msgInstance; - if ((protocolSession.getClientProperties() != null) && - (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + final AMQProtocolSession publisher = entry.getMessage().getPublisher(); + if(publisher != null) + { + // We don't want local messages so check to see if message is one we sent + Object localInstance; + Object msgInstance; - if ((publisher.getClientProperties() != null) && - (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) + if ((protocolSession.getClientProperties() != null) && + (localInstance = protocolSession.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { - if (localInstance == msgInstance || localInstance.equals(msgInstance)) + + if ((publisher.getClientProperties() != null) && + (msgInstance = publisher.getClientProperties().getObject(CLIENT_PROPERTIES_INSTANCE)) != null) { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + if (localInstance == msgInstance || localInstance.equals(msgInstance)) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } } - } - else - { + else + { - localInstance = protocolSession.getClientIdentifier(); - //todo - client id should be recoreded and this test removed but handled here + localInstance = protocolSession.getClientIdentifier(); + //todo - client id should be recoreded and this test removed but handled here - msgInstance = publisher.getClientIdentifier(); - if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + -// msg.debugIdentity() + ")"); -// } - return false; + msgInstance = publisher.getClientIdentifier(); + if (localInstance == msgInstance || ((localInstance != null) && localInstance.equals(msgInstance))) + { + // if (_logger.isTraceEnabled()) + // { + // _logger.trace("(" + debugIdentity() + ") has no interest as it is a local message(" + + // msg.debugIdentity() + ")"); + // } + return false; + } } - } - + } } - if (_logger.isDebugEnabled()) - { - _logger.debug("(" + debugIdentity() + ") checking filters for message (" + entry.debugIdentity()); - } return checkFilters(entry); } @@ -433,23 +438,7 @@ public class SubscriptionImpl implements Subscription private boolean checkFilters(QueueEntry msg) { - if (_filters != null) - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has filters."); -// } - return _filters.allAllow(msg.getMessage()); - } - else - { -// if (_logger.isTraceEnabled()) -// { -// _logger.trace("(" + debugIdentity() + ") has no filters"); -// } - - return true; - } + return (_filters == null) || _filters.allAllow(msg.getMessage()); } public Queue getPreDeliveryQueue() @@ -613,7 +602,7 @@ public class SubscriptionImpl implements Subscription public boolean wouldSuspend(QueueEntry msg) { - return channel.wouldSuspend(msg.getMessage()); + return _acks && channel.wouldSuspend(msg.getMessage()); } public Queue getResendQueue() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b2f8cae8ff..b7cdaa29ab 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -39,6 +39,7 @@ class SubscriptionSet implements WeightedSubscriptionManager private int _currentSubscriber; private final Object _changeLock = new Object(); + private volatile boolean _exclusive; /** Accessor for unit tests. */ @@ -116,10 +117,7 @@ class SubscriptionSet implements WeightedSubscriptionManager */ public Subscription nextSubscriber(QueueEntry msg) { - if (_subscriptions.isEmpty()) - { - return null; - } + try { @@ -143,30 +141,64 @@ class SubscriptionSet implements WeightedSubscriptionManager private Subscription nextSubscriberImpl(QueueEntry msg) { - final ListIterator iterator = _subscriptions.listIterator(_currentSubscriber); - while (iterator.hasNext()) + if(_exclusive) { - Subscription subscription = iterator.next(); - ++_currentSubscriber; - subscriberScanned(); - - if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + try { - if (subscription.hasInterest(msg)) + Subscription subscription = _subscriptions.get(0); + subscriberScanned(); + + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) { - // if the queue is not empty then this client is ready to receive a message. - //FIXME the queue could be full of sent messages. - // Either need to clean all PDQs after sending a message - // OR have a clean up thread that runs the PDQs expunging the messages. - if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + if (subscription.hasInterest(msg)) { - return subscription; + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } } } } + catch(IndexOutOfBoundsException e) + { + } + return null; } + else + { + if (_subscriptions.isEmpty()) + { + return null; + } + final ListIterator iterator = _subscriptions.listIterator(_currentSubscriber); + while (iterator.hasNext()) + { + Subscription subscription = iterator.next(); + ++_currentSubscriber; + subscriberScanned(); - return null; + if (!(subscription.isSuspended() || subscription.wouldSuspend(msg))) + { + if (subscription.hasInterest(msg)) + { + // if the queue is not empty then this client is ready to receive a message. + //FIXME the queue could be full of sent messages. + // Either need to clean all PDQs after sending a message + // OR have a clean up thread that runs the PDQs expunging the messages. + if (!subscription.filtersMessages() || subscription.getPreDeliveryQueue().isEmpty()) + { + return subscription; + } + } + } + } + + return null; + } } /** Overridden in test classes. */ @@ -233,5 +265,14 @@ class SubscriptionSet implements WeightedSubscriptionManager { return _changeLock; } - + + public void setExclusive(final boolean exclusive) + { + _exclusive = exclusive; + } + + public boolean getExcBoolean() + { + return _exclusive; + } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java index 79ee6b93a3..9b91c71a1d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java @@ -22,6 +22,8 @@ package org.apache.qpid.server.queue; import java.util.LinkedList; import java.util.List; +import java.util.ArrayList; +import java.util.Collections; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfo; @@ -60,7 +62,7 @@ public class TransientMessageData * delivered. It is cleared after delivery has been attempted. Any persistent record of destinations is done * by the message handle. */ - private List _destinationQueues = new LinkedList(); + private List _destinationQueues; public MessagePublishInfo getMessagePublishInfo() { @@ -74,7 +76,7 @@ public class TransientMessageData public List getDestinationQueues() { - return _destinationQueues; + return _destinationQueues == null ? (List) Collections.EMPTY_LIST : _destinationQueues; } public void setDestinationQueues(List destinationQueues) @@ -109,6 +111,10 @@ public class TransientMessageData public void addDestinationQueue(AMQQueue queue) { + if(_destinationQueues == null) + { + _destinationQueues = new ArrayList(); + } _destinationQueues.add(queue); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 047cef9064..1e4b69c935 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -49,11 +49,11 @@ public class NonTransactionalContext implements TransactionalContext /** Where to put undeliverable messages */ private final List _returnMessages; - private Set _browsedAcks; + private final Set _browsedAcks; private final MessageStore _messageStore; - private StoreContext _storeContext; + private final StoreContext _storeContext; /** Whether we are in a transaction */ private boolean _inTran; diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 9addf83e01..afe96bcd4f 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.virtualhost; import javax.management.NotCompliantMBeanException; import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; import org.apache.qpid.server.security.access.AccessManager; @@ -123,7 +124,7 @@ public class VirtualHost implements Accessable */ public VirtualHost(String name, MessageStore store) throws Exception { - this(name, null, store); + this(name, new PropertiesConfiguration(), store); } /** diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java index fbd9e65480..ed79384d42 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueAlertTest.java @@ -173,7 +173,7 @@ public class AMQQueueAlertTest extends TestCase public void testQueueDepthAlertWithSubscribers() throws Exception { protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 2, _messageStore); protocolSession.addChannel(channel); // Create queue diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java index e72e1bf1f0..c02b47e9fd 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java @@ -123,7 +123,7 @@ public class AMQQueueMBeanTest extends TestCase TestMinaProtocolSession protocolSession = new TestMinaProtocolSession(); - AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore, null); + AMQChannel channel = new AMQChannel(protocolSession, 1, _messageStore); protocolSession.addChannel(channel); _queue.registerProtocolSession(protocolSession, 1, new AMQShortString("test"), false, null, false, false); -- cgit v1.2.1 From a62a5b1d74ea7b45778911e83d69f3e9197042b7 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Thu, 21 Feb 2008 20:42:21 +0000 Subject: QPID-804 : Fix Java Broker Python test failures git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@629981 13f79535-47bb-0310-9956-ffa450edef68 --- .../main/java/org/apache/qpid/server/queue/ExchangeBindings.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java index 60c1a8f574..e6377b33da 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ExchangeBindings.java @@ -51,7 +51,7 @@ class ExchangeBindings ExchangeBinding(AMQShortString routingKey, Exchange exchange, FieldTable arguments) { - _routingKey = routingKey; + _routingKey = routingKey == null ? AMQShortString.EMPTY_STRING : routingKey; _exchange = exchange; _arguments = arguments == null ? EMPTY_ARGUMENTS : arguments; } @@ -74,8 +74,7 @@ class ExchangeBindings public int hashCode() { return (_exchange == null ? 0 : _exchange.hashCode()) - + (_routingKey == null ? 0 : _routingKey.hashCode()) - + (_arguments == null ? 0 : _arguments.hashCode()); + + (_routingKey == null ? 0 : _routingKey.hashCode()); } public boolean equals(Object o) @@ -86,8 +85,7 @@ class ExchangeBindings } ExchangeBinding eb = (ExchangeBinding) o; return _exchange.equals(eb._exchange) - && _routingKey.equals(eb._routingKey) - && _arguments.equals(eb._arguments); + && _routingKey.equals(eb._routingKey); } } -- cgit v1.2.1 From 90b7512b4814a0efd6fd5567d6d2a21c5c14ac0b Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 22 Feb 2008 16:15:11 +0000 Subject: QPID-790 : Performance Improvements git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630239 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/exchange/DestWildExchange.java | 17 +---------------- .../apache/qpid/server/filter/JMSSelectorFilter.java | 2 +- .../org/apache/qpid/server/queue/SubscriptionSet.java | 4 ---- 3 files changed, 2 insertions(+), 21 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java index dbe7a8938a..6fa3686152 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java +++ b/java/broker/src/main/java/org/apache/qpid/server/exchange/DestWildExchange.java @@ -87,7 +87,7 @@ public class DestWildExchange extends AbstractExchange new ConcurrentHashMap>(); private final ConcurrentHashMap> _wildCardBindingKey2queues = new ConcurrentHashMap>(); - // private ConcurrentHashMap _routingKey2queue = new ConcurrentHashMap(); + private static final byte TOPIC_SEPARATOR = (byte)'.'; private static final AMQShortString TOPIC_SEPARATOR_AS_SHORTSTRING = new AMQShortString("."); private static final AMQShortString AMQP_STAR_TOKEN = new AMQShortString("*"); @@ -280,16 +280,6 @@ public class DestWildExchange extends AbstractExchange AMQShortString normalizedString = AMQShortString.join(subscriptionList, TOPIC_SEPARATOR_AS_SHORTSTRING); -/* - StringBuilder sb = new StringBuilder(); - for (AMQShortString s : subscriptionList) - { - sb.append(s); - sb.append(TOPIC_SEPARATOR); - } - - sb.deleteCharAt(sb.length() - 1); -*/ return normalizedString; } @@ -460,11 +450,6 @@ public class DestWildExchange extends AbstractExchange { AMQShortString next = routingTokens.nextToken(); - /* if (next.equals(AMQP_HASH) && routingkeyTokens.get(routingkeyTokens.size() - 1).equals(AMQP_HASH)) - { - continue; - } - */ routingkeyTokens[token++] = next; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java index 2061803d65..32f58ed666 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/JMSSelectorFilter.java @@ -56,7 +56,7 @@ public class JMSSelectorFilter implements MessageFilter catch (AMQException e) { //fixme this needs to be sorted.. it shouldn't happen - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + e.printStackTrace(); } return false; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java index b7cdaa29ab..882efd380d 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java @@ -271,8 +271,4 @@ class SubscriptionSet implements WeightedSubscriptionManager _exclusive = exclusive; } - public boolean getExcBoolean() - { - return _exclusive; - } } -- cgit v1.2.1 From dbce7647235272887a2fb4680698c6689800fe24 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 04:56:42 +0000 Subject: QPID-810 : Moved check for closingChannels higher in stack and close channel on any AMQException being thrown from the body.handle methods. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630733 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/AMQMinaProtocolSession.java | 56 +++++++++++++--------- 1 file changed, 33 insertions(+), 23 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 143ee5fa40..6f40594cb4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -109,7 +109,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable private FieldTable _clientProperties; private final List _taskList = new CopyOnWriteArrayList(); - private List _closingChannelsList = new ArrayList(); + private List _closingChannelsList = new CopyOnWriteArrayList(); private ProtocolOutputConverter _protocolOutputConverter; private Principal _authorizedID; private MethodDispatcher _dispatcher; @@ -208,9 +208,39 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable { _logger.debug("Frame Received: " + frame); } + + // Check that this channel is not closing + if (channelAwaitingClosure(channelId)) + { + if ((frame.getBodyFrame() instanceof ChannelCloseOkBody)) + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); + } + } + else + { + if (_logger.isInfoEnabled()) + { + _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); + } + + return; + } + } - body.handle(channelId, this); + + try + { + body.handle(channelId, this); + } + catch (AMQException e) + { + closeChannel(channelId); + throw e; + } } @@ -259,27 +289,6 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable final AMQMethodEvent evt = new AMQMethodEvent(channelId, methodBody); - // Check that this channel is not closing - if (channelAwaitingClosure(channelId)) - { - if ((evt.getMethod() instanceof ChannelCloseOkBody)) - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure - processing close-ok"); - } - } - else - { - if (_logger.isInfoEnabled()) - { - _logger.info("Channel[" + channelId + "] awaiting closure ignoring"); - } - - return; - } - } - try { try @@ -341,6 +350,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable _logger.info("Closing connection due to: " + e.getMessage()); } + markChannelawaitingCloseOk(channelId); closeSession(); _stateManager.changeState(AMQState.CONNECTION_CLOSING); writeFrame(e.getCloseFrame(channelId)); -- cgit v1.2.1 From 143f7f53f887b64a3ea6d8cb9d44616105924c18 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 13:54:46 +0000 Subject: QPID-107 : Initial ACL implementation for review. Implemented Permissions : Consume, Create, Publish. The Permissions are configured via XML in a user friendly way. Sections for consume, create and publish are currently used to further define Access and Bind internally. Access is granted to all users that have some permission. Bind rights are given to users with Create rights. Full details of the ACL design will be posted on the wiki : http://cwiki.apache.org/qpid/qpid-design-access-control-lists.html git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630854 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/src/main/java/log4j.properties | 2 +- .../server/handler/BasicConsumeMethodHandler.java | 4 + .../qpid/server/handler/BasicGetMethodHandler.java | 8 +- .../server/handler/BasicPublishMethodHandler.java | 6 +- .../handler/ConnectionOpenMethodHandler.java | 22 +- .../server/handler/ExchangeDeclareHandler.java | 6 +- .../qpid/server/handler/ExchangeDeleteHandler.java | 6 +- .../qpid/server/handler/QueueBindHandler.java | 6 +- .../qpid/server/handler/QueueDeclareHandler.java | 10 +- .../qpid/server/handler/QueueDeleteHandler.java | 5 + .../qpid/server/handler/QueuePurgeHandler.java | 5 + .../qpid/server/handler/QueueUnbindHandler.java | 223 ++++---- .../management/MBeanInvocationHandlerImpl.java | 2 +- .../ConfigurationFileApplicationRegistry.java | 10 +- .../qpid/server/registry/IApplicationRegistry.java | 6 +- .../qpid/server/security/access/ACLManager.java | 161 ++++++ .../qpid/server/security/access/ACLPlugin.java | 58 ++ .../security/access/AMQUserManagementMBean.java | 467 ---------------- .../qpid/server/security/access/AccessManager.java | 34 -- .../server/security/access/AccessManagerImpl.java | 155 ------ .../qpid/server/security/access/AccessResult.java | 12 +- .../qpid/server/security/access/AllowAll.java | 42 -- .../qpid/server/security/access/DenyAll.java | 41 -- .../server/security/access/FileAccessManager.java | 183 ------- .../qpid/server/security/access/Permission.java | 37 ++ .../access/PrincipalDatabaseAccessManager.java | 108 ---- .../security/access/PrincipalPermissions.java | 587 +++++++++++++++++++++ .../server/security/access/UserManagement.java | 118 ----- .../access/management/AMQUserManagementMBean.java | 468 ++++++++++++++++ .../security/access/management/UserManagement.java | 118 +++++ .../server/security/access/plugins/AllowAll.java | 68 +++ .../server/security/access/plugins/DenyAll.java | 57 ++ .../server/security/access/plugins/SimpleXML.java | 431 +++++++++++++++ .../Base64MD5PasswordFilePrincipalDatabase.java | 3 +- .../ConfigurationFilePrincipalDatabaseManager.java | 2 +- .../PlainPasswordVhostFilePrincipalDatabase.java | 130 ----- .../qpid/server/util/NullApplicationRegistry.java | 8 +- .../qpid/server/virtualhost/VirtualHost.java | 10 +- 38 files changed, 2172 insertions(+), 1447 deletions(-) create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java create mode 100755 java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java create mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java delete mode 100644 java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/log4j.properties b/java/broker/src/main/java/log4j.properties index 87f04f4991..6788c65463 100644 --- a/java/broker/src/main/java/log4j.properties +++ b/java/broker/src/main/java/log4j.properties @@ -19,6 +19,6 @@ log4j.rootCategory=${amqj.logging.level}, console log4j.appender.console=org.apache.log4j.ConsoleAppender -log4j.appender.console.Threshold=info +log4j.appender.console.Threshold=all log4j.appender.console.layout=org.apache.log4j.PatternLayout log4j.appender.console.layout.ConversionPattern=%t %d %p [%c{4}] %m%n diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index ee46b16397..bacf34f871 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -97,6 +98,9 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener @@ -79,22 +77,8 @@ public class ConnectionOpenMethodHandler implements StateAwareMethodListener try { + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.BIND, body, exch, queue, routingKey); + if (!exch.isBound(routingKey, body.getArguments(), queue)) { queue.bind(routingKey, body.getArguments(), exch); diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index 067c6ac285..b2797138c2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -20,17 +20,14 @@ */ package org.apache.qpid.server.handler; -import java.text.MessageFormat; import java.util.concurrent.atomic.AtomicInteger; import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; -import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; @@ -38,6 +35,7 @@ import org.apache.qpid.server.exchange.ExchangeRegistry; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.security.access.Permission; import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.store.MessageStore; @@ -75,6 +73,8 @@ public class QueueDeclareHandler implements StateAwareMethodListener { @@ -103,6 +104,10 @@ public class QueueDeleteHandler implements StateAwareMethodListener { @@ -100,6 +101,10 @@ public class QueuePurgeHandler implements StateAwareMethodListener -{ - private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); - - private static final QueueUnbindHandler _instance = new QueueUnbindHandler(); - - public static QueueUnbindHandler getInstance() - { - return _instance; - } - - private QueueUnbindHandler() - { - } - - public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException - { - AMQProtocolSession session = stateManager.getProtocolSession(); - VirtualHost virtualHost = session.getVirtualHost(); - ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); - QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); - - - final AMQQueue queue; - final AMQShortString routingKey; - - if (body.getQueue() == null) - { - AMQChannel channel = session.getChannel(channelId); - - if (channel == null) - { - throw body.getChannelNotFoundException(channelId); - } - - queue = channel.getDefaultQueue(); - - if (queue == null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); - } - - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); - - } - else - { - queue = queueRegistry.getQueue(body.getQueue()); - routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); - } - - if (queue == null) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); - } - final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); - if (exch == null) - { - throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); - } - - - try - { - queue.unBind(routingKey, body.getArguments(), exch); - } - catch (AMQInvalidRoutingKeyException rke) - { - throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString()); - } - catch (AMQException e) - { - if(e.getErrorCode() == AMQConstant.NOT_FOUND) - { - throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e); - } - throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); - } - - if (_log.isInfoEnabled()) - { - _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); - } - - MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); - AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody(); - session.writeFrame(responseBody.generateFrame(channelId)); - - - } -} +package org.apache.qpid.server.handler; + +import org.apache.log4j.Logger; + +import org.apache.qpid.framing.*; +import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9; +import org.apache.qpid.server.state.StateAwareMethodListener; +import org.apache.qpid.server.state.AMQStateManager; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.exchange.ExchangeRegistry; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.QueueRegistry; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.AMQChannel; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.AMQException; +import org.apache.qpid.AMQInvalidRoutingKeyException; +import org.apache.qpid.protocol.AMQConstant; + +public class QueueUnbindHandler implements StateAwareMethodListener +{ + private static final Logger _log = Logger.getLogger(QueueUnbindHandler.class); + + private static final QueueUnbindHandler _instance = new QueueUnbindHandler(); + + public static QueueUnbindHandler getInstance() + { + return _instance; + } + + private QueueUnbindHandler() + { + } + + public void methodReceived(AMQStateManager stateManager, QueueUnbindBody body, int channelId) throws AMQException + { + AMQProtocolSession session = stateManager.getProtocolSession(); + VirtualHost virtualHost = session.getVirtualHost(); + ExchangeRegistry exchangeRegistry = virtualHost.getExchangeRegistry(); + QueueRegistry queueRegistry = virtualHost.getQueueRegistry(); + + + final AMQQueue queue; + final AMQShortString routingKey; + + if (body.getQueue() == null) + { + AMQChannel channel = session.getChannel(channelId); + + if (channel == null) + { + throw body.getChannelNotFoundException(channelId); + } + + queue = channel.getDefaultQueue(); + + if (queue == null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, "No default queue defined on channel and queue was null"); + } + + routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); + + } + else + { + queue = queueRegistry.getQueue(body.getQueue()); + routingKey = body.getRoutingKey() == null ? null : body.getRoutingKey().intern(); + } + + if (queue == null) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND, "Queue " + body.getQueue() + " does not exist."); + } + final Exchange exch = exchangeRegistry.getExchange(body.getExchange()); + if (exch == null) + { + throw body.getChannelException(AMQConstant.NOT_FOUND, "Exchange " + body.getExchange() + " does not exist."); + } + + //Perform ACLs + virtualHost.getAccessManager().authorise(session, Permission.UNBIND, body, queue); + + try + { + queue.unBind(routingKey, body.getArguments(), exch); + } + catch (AMQInvalidRoutingKeyException rke) + { + throw body.getChannelException(AMQConstant.INVALID_ROUTING_KEY, routingKey.toString()); + } + catch (AMQException e) + { + if(e.getErrorCode() == AMQConstant.NOT_FOUND) + { + throw body.getConnectionException(AMQConstant.NOT_FOUND,e.getMessage(),e); + } + throw body.getChannelException(AMQConstant.CHANNEL_ERROR, e.toString()); + } + + if (_log.isInfoEnabled()) + { + _log.info("Binding queue " + queue + " to exchange " + exch + " with routing key " + routingKey); + } + + MethodRegistry_0_9 methodRegistry = (MethodRegistry_0_9) session.getMethodRegistry(); + AMQMethodBody responseBody = methodRegistry.createQueueUnbindOkBody(); + session.writeFrame(responseBody.generateFrame(channelId)); + + + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java index 4fb260472d..a0ecc2bd85 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/management/MBeanInvocationHandlerImpl.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management; -import org.apache.qpid.server.security.access.UserManagement; +import org.apache.qpid.server.security.access.management.UserManagement; import org.apache.log4j.Logger; import javax.management.remote.MBeanServerForwarder; diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index 42c32dcf00..e4e0b5b1b8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -39,8 +39,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.ConfigurationFilePrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; import org.apache.qpid.AMQException; @@ -52,7 +52,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry private AuthenticationManager _authenticationManager; - private AccessManager _accessManager; + private ACLPlugin _accessManager; private PrincipalDatabaseManager _databaseManager; @@ -110,7 +110,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _virtualHostRegistry = new VirtualHostRegistry(); - _accessManager = new AccessManagerImpl("default", _configuration); + _accessManager = ACLManager.loadACLManager("default", _configuration); _databaseManager = new ConfigurationFilePrincipalDatabaseManager(); @@ -154,7 +154,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry return _virtualHostRegistry; } - public AccessManager getAccessManager() + public ACLPlugin getAccessManager() { return _accessManager; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java index 6aac21a161..ca10fbdba2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/IApplicationRegistry.java @@ -27,7 +27,7 @@ import org.apache.qpid.server.management.ManagedObjectRegistry; import org.apache.qpid.server.plugins.PluginManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AccessManager; +import org.apache.qpid.server.security.access.ACLPlugin; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; public interface IApplicationRegistry @@ -68,8 +68,8 @@ public interface IApplicationRegistry VirtualHostRegistry getVirtualHostRegistry(); - AccessManager getAccessManager(); - + ACLPlugin getAccessManager(); + PluginManager getPluginManager(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java new file mode 100644 index 0000000000..539f32a732 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLManager.java @@ -0,0 +1,161 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.commons.configuration.Configuration; +import org.apache.commons.configuration.ConfigurationException; +import org.apache.qpid.server.registry.ApplicationRegistry; +import org.apache.qpid.server.security.access.plugins.DenyAll; +import org.apache.qpid.configuration.PropertyUtils; +import org.apache.log4j.Logger; + +import java.util.List; +import java.lang.reflect.Method; + +public class ACLManager +{ + private static final Logger _logger = Logger.getLogger(ACLManager.class); + + public static ACLPlugin loadACLManager(String name, Configuration hostConfig) throws ConfigurationException + { + ACLPlugin aclPlugin = ApplicationRegistry.getInstance().getAccessManager(); + + if (hostConfig == null) + { + _logger.warn("No Configuration specified. Using default ACLPlugin '" + aclPlugin.getPluginName() + + "' for VirtualHost:'" + name + "'"); + return aclPlugin; + } + + String accessClass = hostConfig.getString("security.access.class"); + if (accessClass == null) + { + + _logger.warn("No ACL Plugin specified. Using default ACL Plugin '" + aclPlugin.getPluginName() + + "' for VirtualHost:'" + name + "'"); + return aclPlugin; + } + + Object o; + try + { + o = Class.forName(accessClass).newInstance(); + } + catch (Exception e) + { + throw new ConfigurationException("Error initialising ACL: " + e, e); + } + + if (!(o instanceof ACLPlugin)) + { + throw new ConfigurationException("ACL Plugins must implement the ACLPlugin interface"); + } + + initialiseAccessControl((ACLPlugin) o, hostConfig); + + aclPlugin = getManager((ACLPlugin) o); + if (_logger.isInfoEnabled()) + { + _logger.info("Initialised ACL Plugin '" + aclPlugin.getPluginName() + + "' for virtualhost '" + name + "' successfully"); + } + + return aclPlugin; + } + + + private static void initialiseAccessControl(ACLPlugin accessManager, Configuration config) + throws ConfigurationException + { + //First provide the ACLPlugin with the host configuration + + accessManager.setConfiguaration(config); + + //Provide additional attribute customisation. + String baseName = "security.access.attributes.attribute."; + List argumentNames = config.getList(baseName + "name"); + List argumentValues = config.getList(baseName + "value"); + for (int i = 0; i < argumentNames.size(); i++) + { + String argName = argumentNames.get(i); + if (argName == null || argName.length() == 0) + { + throw new ConfigurationException("Access Control argument names must have length >= 1 character"); + } + if (Character.isLowerCase(argName.charAt(0))) + { + argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); + } + String methodName = "set" + argName; + Method method = null; + try + { + method = accessManager.getClass().getMethod(methodName, String.class); + } + catch (NoSuchMethodException e) + { + //do nothing as method will be null + } + + if (method == null) + { + throw new ConfigurationException("No method " + methodName + " found in class " + accessManager.getClass() + + " hence unable to configure access control. The method must be public and " + + "have a single String argument with a void return type"); + } + try + { + method.invoke(accessManager, PropertyUtils.replaceProperties(argumentValues.get(i))); + } + catch (Exception e) + { + ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause()); + ce.initCause(e); + throw ce; + } + } + } + + + private static ACLPlugin getManager(ACLPlugin manager) + { + if (manager == null) + { + if (ApplicationRegistry.getInstance().getAccessManager() == null) + { + return new DenyAll(); + } + else + { + return ApplicationRegistry.getInstance().getAccessManager(); + } + } + else + { + return manager; + } + } + + public static Logger getLogger() + { + return _logger; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java new file mode 100644 index 0000000000..7855f147b4 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/ACLPlugin.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.framing.AMQMethodBody; + +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.AMQConnectionException; +import org.apache.commons.configuration.Configuration; + + +public interface ACLPlugin +{ + /** + * Pseudo-Code: + * Identify requested RighConnectiont + * Lookup users ability for that right. + * if rightsExists + * Validate right on object + * Return result + * e.g + * User, CONSUME , Queue + * User, CONSUME , Exchange + RoutingKey + * User, PUBLISH , Exchange + RoutingKey + * User, CREATE , Exchange || Queue + * User, BIND , Exchange + RoutingKey + Queue + * + * @param session - The session requesting access + * @param permission - The permission requested + * @param parameters - The above objects that are used to authorise the request. + * @return The AccessResult decision + */ + //todo potential refactor this ConnectionException Out of here + AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException; + + String getPluginName(); + + void setConfiguaration(Configuration config); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java deleted file mode 100644 index 2dc7fcbc1e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AMQUserManagementMBean.java +++ /dev/null @@ -1,467 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.management.MBeanDescription; -import org.apache.qpid.server.management.AMQManagedObject; -import org.apache.qpid.server.management.MBeanOperation; -import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.log4j.Logger; -import org.apache.commons.configuration.ConfigurationException; - -import javax.management.JMException; -import javax.management.remote.JMXPrincipal; -import javax.management.openmbean.TabularData; -import javax.management.openmbean.TabularDataSupport; -import javax.management.openmbean.TabularType; -import javax.management.openmbean.SimpleType; -import javax.management.openmbean.CompositeType; -import javax.management.openmbean.OpenType; -import javax.management.openmbean.OpenDataException; -import javax.management.openmbean.CompositeData; -import javax.management.openmbean.CompositeDataSupport; -import javax.security.auth.login.AccountNotFoundException; -import javax.security.auth.Subject; -import java.io.File; -import java.io.FileInputStream; -import java.io.IOException; -import java.io.FileOutputStream; -import java.util.Properties; -import java.util.List; -import java.util.Enumeration; -import java.util.Set; -import java.util.concurrent.locks.ReentrantLock; -import java.security.Principal; -import java.security.AccessControlContext; -import java.security.AccessController; - -/** MBean class for AMQUserManagementMBean. It implements all the management features exposed for managing users. */ -@MBeanDescription("User Management Interface") -public class AMQUserManagementMBean extends AMQManagedObject implements UserManagement -{ - - private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class); - - private PrincipalDatabase _principalDatabase; - private String _accessFileName; - private Properties _accessRights; - // private File _accessFile; - private ReentrantLock _accessRightsUpdate = new ReentrantLock(); - - // Setup for the TabularType - static TabularType _userlistDataType; // Datatype for representing User Lists - - static CompositeType _userDataType; // Composite type for representing User - static String[] _userItemNames = {"Username", "read", "write", "admin"}; - - static - { - String[] userItemDesc = {"Broker Login username", "Management Console Read Permission", - "Management Console Write Permission", "Management Console Admin Permission"}; - - OpenType[] userItemTypes = new OpenType[4]; // User item types. - userItemTypes[0] = SimpleType.STRING; // For Username - userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read - userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write - userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin - String[] userDataIndex = {_userItemNames[0]}; - - try - { - _userDataType = - new CompositeType("User", "User Data", _userItemNames, userItemDesc, userItemTypes); - - _userlistDataType = new TabularType("Users", "List of users", _userDataType, userDataIndex); - } - catch (OpenDataException e) - { - _logger.error("Tabular data setup for viewing users incorrect."); - _userlistDataType = null; - } - } - - - public AMQUserManagementMBean() throws JMException - { - super(UserManagement.class, UserManagement.TYPE); - } - - public String getObjectInstanceName() - { - return UserManagement.TYPE; - } - - public boolean setPassword(String username, char[] password) - { - try - { - //delegate password changes to the Principal Database - return _principalDatabase.updatePassword(new UsernamePrincipal(username), password); - } - catch (AccountNotFoundException e) - { - _logger.warn("Attempt to set password of non-existant user'" + username + "'"); - return false; - } - } - - public boolean setRights(String username, boolean read, boolean write, boolean admin) - { - - if (_accessRights.get(username) == null) - { - // If the user doesn't exist in the user rights file check that they at least have an account. - if (_principalDatabase.getUser(username) == null) - { - return false; - } - } - - try - { - - _accessRightsUpdate.lock(); - - // Update the access rights - if (admin) - { - _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN); - } - else - { - if (read | write) - { - if (read) - { - _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY); - } - if (write) - { - _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE); - } - } - else - { - _accessRights.remove(username); - } - } - - saveAccessFile(); - } - finally - { - if (_accessRightsUpdate.isHeldByCurrentThread()) - { - _accessRightsUpdate.unlock(); - } - } - - return true; - } - - public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin) - { - if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) - { - _accessRights.put(username, ""); - - return setRights(username, read, write, admin); - } - - return false; - } - - public boolean deleteUser(String username) - { - - try - { - if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username))) - { - try - { - _accessRightsUpdate.lock(); - - _accessRights.remove(username); - saveAccessFile(); - } - finally - { - if (_accessRightsUpdate.isHeldByCurrentThread()) - { - _accessRightsUpdate.unlock(); - } - } - return true; - } - } - catch (AccountNotFoundException e) - { - _logger.warn("Attempt to delete user (" + username + ") that doesn't exist"); - } - - return false; - } - - public boolean reloadData() - { - try - { - try - { - loadAccessFile(); - } - catch (ConfigurationException e) - { - _logger.info("Reload failed due to:" + e); - return false; - } - - // Reload successful - return true; - } - catch (IOException e) - { - _logger.info("Reload failed due to:" + e); - // Reload unsuccessful - return false; - } - } - - - @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.") - public TabularData viewUsers() - { - // Table of users - // Username(string), Access rights Read,Write,Admin(bool,bool,bool) - - if (_userlistDataType == null) - { - _logger.warn("TabluarData not setup correctly"); - return null; - } - - List users = _principalDatabase.getUsers(); - - TabularDataSupport userList = new TabularDataSupport(_userlistDataType); - - try - { - // Create the tabular list of message header contents - for (Principal user : users) - { - // Create header attributes list - - String rights = (String) _accessRights.get(user.getName()); - - Boolean read = false; - Boolean write = false; - Boolean admin = false; - - if (rights != null) - { - read = rights.equals(MBeanInvocationHandlerImpl.READONLY) - || rights.equals(MBeanInvocationHandlerImpl.READWRITE); - write = rights.equals(MBeanInvocationHandlerImpl.READWRITE); - admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN); - } - - Object[] itemData = {user.getName(), read, write, admin}; - CompositeData messageData = new CompositeDataSupport(_userDataType, _userItemNames, itemData); - userList.put(messageData); - } - } - catch (OpenDataException e) - { - _logger.warn("Unable to create user list due to :" + e); - return null; - } - - return userList; - } - - /*** Broker Methods **/ - - /** - * setPrincipalDatabase - * - * @param database set The Database to use for user lookup - */ - public void setPrincipalDatabase(PrincipalDatabase database) - { - _principalDatabase = database; - } - - /** - * setAccessFile - * - * @param accessFile the file to use for updating. - * - * @throws java.io.IOException If the file cannot be accessed - * @throws org.apache.commons.configuration.ConfigurationException - * if checks on the file fail. - */ - public void setAccessFile(String accessFile) throws IOException, ConfigurationException - { - _accessFileName = accessFile; - - if (_accessFileName != null) - { - loadAccessFile(); - } - else - { - _logger.warn("Access rights file specified is null. Access rights not changed."); - } - } - - private void loadAccessFile() throws IOException, ConfigurationException - { - try - { - _accessRightsUpdate.lock(); - - Properties accessRights = new Properties(); - - File accessFile = new File(_accessFileName); - - if (!accessFile.exists()) - { - throw new ConfigurationException("'" + _accessFileName + "' does not exist"); - } - - if (!accessFile.canRead()) - { - throw new ConfigurationException("Cannot read '" + _accessFileName + "'."); - } - - if (!accessFile.canWrite()) - { - _logger.warn("Unable to write to access file '" + _accessFileName + "' changes will not be preserved."); - } - - accessRights.load(new FileInputStream(accessFile)); - checkAccessRights(accessRights); - setAccessRights(accessRights); - } - finally - { - if (_accessRightsUpdate.isHeldByCurrentThread()) - { - _accessRightsUpdate.unlock(); - } - } - } - - private void checkAccessRights(Properties accessRights) - { - Enumeration values = accessRights.propertyNames(); - - while (values.hasMoreElements()) - { - String user = (String) values.nextElement(); - - if (_principalDatabase.getUser(user) == null) - { - _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user"); - } - } - } - - private void saveAccessFile() - { - try - { - _accessRightsUpdate.lock(); - try - { - // remove old temporary file - File tmp = new File(_accessFileName + ".tmp"); - if (tmp.exists()) - { - tmp.delete(); - } - - //remove old backup - File old = new File(_accessFileName + ".old"); - if (old.exists()) - { - old.delete(); - } - - // Rename current file - File rights = new File(_accessFileName); - rights.renameTo(old); - - FileOutputStream output = new FileOutputStream(tmp); - _accessRights.store(output, "Generated by AMQUserManagementMBean Console : Last edited by user:" + getCurrentJMXUser()); - output.close(); - - // Rename new file to main file - tmp.renameTo(rights); - - // delete tmp - tmp.delete(); - } - catch (IOException e) - { - _logger.warn("Problem occured saving '" + _accessFileName + "' changes may not be preserved. :" + e); - } - } - finally - { - if (_accessRightsUpdate.isHeldByCurrentThread()) - { - _accessRightsUpdate.unlock(); - } - } - } - - private String getCurrentJMXUser() - { - AccessControlContext acc = AccessController.getContext(); - Subject subject = Subject.getSubject(acc); - - // Retrieve JMXPrincipal from Subject - Set principals = subject.getPrincipals(JMXPrincipal.class); - if (principals == null || principals.isEmpty()) - { - return "Unknown user principals were null"; - } - - Principal principal = principals.iterator().next(); - return principal.getName(); - } - - /** - * user=read user=write user=readwrite user=admin - * - * @param accessRights The properties list of access rights to process - */ - private void setAccessRights(Properties accessRights) - { - _logger.debug("Setting Access Rights:" + accessRights); - _accessRights = accessRights; - MBeanInvocationHandlerImpl.setAccessRights(_accessRights); - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java deleted file mode 100644 index d70a6dc8f4..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManager.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public interface AccessManager -{ - AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights); - - @Deprecated - AccessResult isAuthorized(Accessable accessObject, String username); - - String getName(); - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java deleted file mode 100644 index 35d036d20f..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessManagerImpl.java +++ /dev/null @@ -1,155 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.commons.configuration.Configuration; -import org.apache.commons.configuration.ConfigurationException; -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.qpid.configuration.PropertyUtils; -import org.apache.log4j.Logger; - -import java.util.List; -import java.lang.reflect.Method; -import java.security.Principal; - -public class AccessManagerImpl implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(AccessManagerImpl.class); - - AccessManager _accessManager; - - public AccessManagerImpl(String name, Configuration hostConfig) throws ConfigurationException - { - if (hostConfig == null) - { - _logger.warn("No Configuration specified. Using default access controls for VirtualHost:'" + name + "'"); - return; - } - - String accessClass = hostConfig.getString("security.access.class"); - if (accessClass == null) - { - _logger.warn("No access control specified. Using default access controls for VirtualHost:'" + name + "'"); - return; - } - - Object o; - try - { - o = Class.forName(accessClass).newInstance(); - } - catch (Exception e) - { - throw new ConfigurationException("Error initialising access control: " + e, e); - } - - if (!(o instanceof AccessManager)) - { - throw new ConfigurationException("Access control must implement the VirtualHostAccess interface"); - } - - initialiseAccessControl((AccessManager) o, hostConfig); - - _accessManager = (AccessManager) o; - - _logger.info("Initialised access control for virtualhost '" + name + "' successfully"); - - } - - - private void initialiseAccessControl(AccessManager accessManager, Configuration config) - throws ConfigurationException - { - String baseName = "security.access.attributes.attribute."; - List argumentNames = config.getList(baseName + "name"); - List argumentValues = config.getList(baseName + "value"); - for (int i = 0; i < argumentNames.size(); i++) - { - String argName = argumentNames.get(i); - if (argName == null || argName.length() == 0) - { - throw new ConfigurationException("Access Control argument names must have length >= 1 character"); - } - if (Character.isLowerCase(argName.charAt(0))) - { - argName = Character.toUpperCase(argName.charAt(0)) + argName.substring(1); - } - String methodName = "set" + argName; - Method method = null; - try - { - method = accessManager.getClass().getMethod(methodName, String.class); - } - catch (NoSuchMethodException e) - { - //do nothing as method will be null - } - - if (method == null) - { - throw new ConfigurationException("No method " + methodName + " found in class " + accessManager.getClass() + - " hence unable to configure access control. The method must be public and " + - "have a single String argument with a void return type"); - } - try - { - method.invoke(accessManager, PropertyUtils.replaceProperties(argumentValues.get(i))); - } - catch (Exception e) - { - ConfigurationException ce = new ConfigurationException(e.getMessage(), e.getCause()); - ce.initCause(e); - throw ce; - } - } - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) - { - if (_accessManager == null) - { - if (ApplicationRegistry.getInstance().getAccessManager() == this) - { - _logger.warn("No Default access manager specified DENYING ALL ACCESS"); - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - else - { - return ApplicationRegistry.getInstance().getAccessManager().isAuthorized(accessObject, user, rights); - } - } - else - { - return _accessManager.isAuthorized(accessObject, user, rights); - } - } - - public String getName() - { - return "AccessManagerImpl"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java index b8d8fc605a..89cead69b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/AccessResult.java @@ -30,15 +30,15 @@ public class AccessResult StringBuilder _authorizer; AccessStatus _status; - public AccessResult(AccessManager authorizer, AccessStatus status) + public AccessResult(ACLPlugin authorizer, AccessStatus status) { _status = status; - _authorizer = new StringBuilder(authorizer.getName()); + _authorizer = new StringBuilder(authorizer.getPluginName()); } - public void setAuthorizer(AccessManager authorizer) + public void setAuthorizer(ACLPlugin authorizer) { - _authorizer.append(authorizer.getName()); + _authorizer.append(authorizer.getPluginName()); } public String getAuthorizer() @@ -56,10 +56,10 @@ public class AccessResult return _status; } - public void addAuthorizer(AccessManager accessManager) + public void addAuthorizer(ACLPlugin accessManager) { _authorizer.insert(0, "->"); - _authorizer.insert(0, accessManager.getName()); + _authorizer.insert(0, accessManager.getPluginName()); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java deleted file mode 100644 index 1ddca3a64e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/AllowAll.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public class AllowAll implements AccessManager -{ - - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - - public String getName() - { - return "AllowAll"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java deleted file mode 100644 index bf40eeba4e..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/DenyAll.java +++ /dev/null @@ -1,41 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import java.security.Principal; - -public class DenyAll implements AccessManager -{ - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "DenyAll"; - } -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java deleted file mode 100644 index 291bc714ed..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/FileAccessManager.java +++ /dev/null @@ -1,183 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.log4j.Logger; - -import java.io.IOException; -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.FileNotFoundException; -import java.io.File; -import java.util.regex.Pattern; -import java.security.Principal; - -/** - * Represents a user database where the account information is stored in a simple flat file. - * - * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn - * - * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. - */ -public class FileAccessManager implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(FileAccessManager.class); - - protected File _accessFile; - - protected Pattern _regexp = Pattern.compile(":"); - - private static final short USER_INDEX = 0; - private static final short VIRTUALHOST_INDEX = 1; - - public void setAccessFile(String accessFile) throws FileNotFoundException - { - File f = new File(accessFile); - _logger.info("FileAccessManager using file " + f.getAbsolutePath()); - _accessFile = f; - if (!f.exists()) - { - throw new FileNotFoundException("Cannot find access file " + f); - } - if (!f.canRead()) - { - throw new FileNotFoundException("Cannot read access file " + f + - ". Check permissions."); - } - } - - /** - * Looks up the virtual hosts for a specified user in the access file. - * - * @param user The user to lookup - * - * @return a list of virtualhosts - */ - private VirtualHostAccess[] lookupVirtualHost(String user) - { - String[] results = lookup(user, VIRTUALHOST_INDEX); - VirtualHostAccess vhosts[] = new VirtualHostAccess[results.length]; - - for (int index = 0; index < results.length; index++) - { - vhosts[index] = new VirtualHostAccess(results[index]); - } - - return vhosts; - } - - - private String[] lookup(String user, int index) - { - try - { - BufferedReader reader = null; - try - { - reader = new BufferedReader(new FileReader(_accessFile)); - String line; - - while ((line = reader.readLine()) != null) - { - String[] result = _regexp.split(line); - if (result == null || result.length < (index + 1)) - { - continue; - } - - if (user.equals(result[USER_INDEX])) - { - return result[index].split(","); - } - } - return null; - } - finally - { - if (reader != null) - { - reader.close(); - } - } - } - catch (IOException ioe) - { - //ignore - } - return null; - } - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) - { - if (accessObject instanceof VirtualHost) - { - VirtualHostAccess[] hosts = lookupVirtualHost(user.getName()); - - if (hosts != null) - { - for (VirtualHostAccess host : hosts) - { - if (accessObject.getAccessableName().equals(host.getVirtualHost())) - { - if (host.getAccessRights().allows(rights)) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - else - { - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - } - } - } - } -// else if (accessObject instanceof AMQQueue) -// { -// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost()); -// -// if (queues != null) -// { -// for (String queue : queues) -// { -// if (accessObject.getAccessableName().equals(queue)) -// { -// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); -// } -// } -// } -// } - - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "FileAccessManager"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java new file mode 100644 index 0000000000..5d439a99eb --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/Permission.java @@ -0,0 +1,37 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.queue.AMQQueue; + +public enum Permission +{ + CONSUME, + PUBLISH, + CREATE, + ACCESS, + BIND, + UNBIND, + DELETE, + PURGE +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java deleted file mode 100644 index 6ccadb2e7d..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalDatabaseAccessManager.java +++ /dev/null @@ -1,108 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.security.auth.database.PrincipalDatabase; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.log4j.Logger; - -import java.security.Principal; - -public class PrincipalDatabaseAccessManager implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(PrincipalDatabaseAccessManager.class); - - PrincipalDatabase _database; - AccessManager _default; - - public PrincipalDatabaseAccessManager() - { - _default = null; - } - - public void setDefaultAccessManager(String defaultAM) - { - if (defaultAM.equals("AllowAll")) - { - _default = new AllowAll(); - } - - if (defaultAM.equals("DenyAll")) - { - _default = new DenyAll(); - } - } - - public void setPrincipalDatabase(String database) - { - _database = ApplicationRegistry.getInstance().getDatabaseManager().getDatabases().get(database); - if (!(_database instanceof AccessManager)) - { - _logger.warn("Database '" + database + "' cannot perform access management"); - } - } - - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal username, AccessRights.Rights rights) - { - AccessResult result; - - if (_database == null) - { - if (_default != null) - { - result = _default.isAuthorized(accessObject, username, rights); - } - else - { - throw new RuntimeException("Principal Database and default Access Manager are both null unable to perform Access Control"); - } - } - else - { - if (!(_database instanceof AccessManager)) - { - _logger.warn("Specified PrincipalDatabase is not an AccessManager so using default AccessManager"); - result = _default.isAuthorized(accessObject, username, rights); - } - else - { - result = ((AccessManager) _database).isAuthorized(accessObject, username, rights); - } - } - - result.addAuthorizer(this); - - return result; - } - - public String getName() - { - return "PrincipalDatabaseFileAccessManager"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java new file mode 100755 index 0000000000..22f1cf25a8 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/PrincipalPermissions.java @@ -0,0 +1,587 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.security.access; + +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.QueueBindBody; +import org.apache.qpid.framing.QueueDeclareBody; +import org.apache.qpid.framing.ExchangeDeclareBody; +import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.exchange.Exchange; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +public class PrincipalPermissions +{ + + private static final int CONSUME_QUEUES_KEY = 0; + private static final int CONSUME_TEMPORARY_KEY = 1; + private static final int CONSUME_OWN_QUEUES_ONLY_KEY = 2; + + private static final int CREATE_QUEUES_KEY = 0; + private static final int CREATE_EXCHANGES_KEY = 1; + + private static final int CREATE_QUEUE_TEMPORARY_KEY = 2; + private static final int CREATE_QUEUE_QUEUES_KEY = 1; + private static final int CREATE_QUEUE_EXCHANGES_KEY = 0; + + private static final int CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY = 0; + private static final int CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY = 1; + + private static final int PUBLISH_EXCHANGES_KEY = 0; + + private Map _permissions; + + private String _user; + + + public PrincipalPermissions(String user) + { + _user = user; + _permissions = new ConcurrentHashMap(); + } + + public void grant(Permission permission, Object... parameters) + { + switch (permission) + { + case ACCESS: + break; // This is a no-op as the existence of this PrincipalPermission object is scoped per VHost for ACCESS + case BIND: + break; // All the details are currently included in the create setup. + case CONSUME: // Parameters : AMQShortString queueName, Boolean Temporary, Boolean ownQueueOnly + Map consumeRights = (Map) _permissions.get(permission); + + if (consumeRights == null) + { + consumeRights = new ConcurrentHashMap(); + _permissions.put(permission, consumeRights); + } + + //if we have parametsre + if (parameters.length > 0) + { + AMQShortString queueName = (AMQShortString) parameters[0]; + Boolean temporary = (Boolean) parameters[1]; + Boolean ownQueueOnly = (Boolean) parameters[2]; + + if (temporary) + { + consumeRights.put(CONSUME_TEMPORARY_KEY, true); + } + else + { + consumeRights.put(CONSUME_TEMPORARY_KEY, false); + } + + if (ownQueueOnly) + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, true); + } + else + { + consumeRights.put(CONSUME_OWN_QUEUES_ONLY_KEY, false); + } + + + LinkedList queues = (LinkedList) consumeRights.get(CONSUME_QUEUES_KEY); + if (queues == null) + { + queues = new LinkedList(); + consumeRights.put(CONSUME_QUEUES_KEY, queues); + } + + if (queueName != null) + { + queues.add(queueName); + } + } + + + break; + case CREATE: // Parameters : Boolean temporary, AMQShortString queueName + // , AMQShortString exchangeName , AMQShortString routingKey + // || AMQShortString exchangeName , AMQShortString Class + + Map createRights = (Map) _permissions.get(permission); + + if (createRights == null) + { + createRights = new ConcurrentHashMap(); + _permissions.put(permission, createRights); + + } + + //The existence of the empty map mean permission to all. + if (parameters.length == 0) + { + return; + } + + + if (parameters[0] instanceof Boolean) //Create Queue : + // Boolean temporary, [AMQShortString queueName, AMQShortString exchangeName , AMQShortString routingKey] + { + Boolean temporary = (Boolean) parameters[0]; + + AMQShortString queueName = parameters.length > 1 ? (AMQShortString) parameters[1] : null; + AMQShortString exchangeName = parameters.length > 2 ? (AMQShortString) parameters[2] : null; + //Set the routingkey to the specified value or the queueName if present + AMQShortString routingKey = parameters.length > 3 ? (AMQShortString) parameters[3] : queueName; + + // Get the queues map + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + if (create_queues == null) + { + create_queues = new ConcurrentHashMap(); + createRights.put(CREATE_QUEUES_KEY, create_queues); + } + + //Allow all temp queues to be created + create_queues.put(CREATE_QUEUE_TEMPORARY_KEY, temporary); + + //Create empty list of queues + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + if (create_queues_queues == null) + { + create_queues_queues = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_QUEUES_KEY, create_queues_queues); + } + + // We are granting CREATE rights to all temporary queues only + if (parameters.length == 1) + { + return; + } + + // if we have a queueName then we need to store any associated exchange / rk bindings + if (queueName != null) + { + Map queue = (Map) create_queues_queues.get(queueName); + if (queue == null) + { + queue = new ConcurrentHashMap(); + create_queues_queues.put(queueName, queue); + } + + if (exchangeName != null) + { + queue.put(exchangeName, routingKey); + } + + //If no exchange is specified then the presence of the queueName in the map says any exchange is ok + } + + // Store the exchange that we are being granted rights to. This will be used as part of binding + + //Lookup the list of exchanges + Map create_queues_exchanges = (Map) create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + if (create_queues_exchanges == null) + { + create_queues_exchanges = new ConcurrentHashMap(); + create_queues.put(CREATE_QUEUE_EXCHANGES_KEY, create_queues_exchanges); + } + + //if we have an exchange + if (exchangeName != null) + { + //Retrieve the list of permitted exchanges. + Map exchanges = (Map) create_queues_exchanges.get(exchangeName); + + if (exchanges == null) + { + exchanges = new ConcurrentHashMap(); + create_queues_exchanges.put(exchangeName, exchanges); + } + + //Store the temporary setting CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY + exchanges.put(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY, temporary); + + //Store the binding details of queue/rk for this exchange. + if (queueName != null) + { + //Retrieve the list of permitted routingKeys. + Map rKeys = (Map) exchanges.get(exchangeName); + + if (rKeys == null) + { + rKeys = new ConcurrentHashMap(); + exchanges.put(CREATE_QUEUE_EXCHANGES_ROUTINGKEYS_KEY, rKeys); + } + + rKeys.put(queueName, routingKey); + } + } + } + else // Create Exchange : AMQShortString exchangeName , AMQShortString Class + { + Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY); + + if (create_exchanges == null) + { + create_exchanges = new ConcurrentHashMap(); + createRights.put(CREATE_EXCHANGES_KEY, create_exchanges); + } + + //Should perhaps error if parameters[0] is null; + AMQShortString exchangeName = parameters.length > 0 ? (AMQShortString) parameters[0] : null; + AMQShortString className = parameters.length > 1 ? (AMQShortString) parameters[1] : null; + + //Store the exchangeName / class mapping if the mapping is null + createRights.put(exchangeName, className); + } + break; + case DELETE: + break; + + case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + publishRights = new ConcurrentHashMap(); + _permissions.put(permission, publishRights); + } + + if (parameters == null || parameters.length == 0) + { + //If we have no parameters then allow publish to all destinations + // this is signified by having a null value for publish_exchanges + } + else + { + Map publish_exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + if (publish_exchanges == null) + { + publish_exchanges = new ConcurrentHashMap(); + publishRights.put(PUBLISH_EXCHANGES_KEY, publish_exchanges); + } + + + HashSet routingKeys = (HashSet) publish_exchanges.get(parameters[0]); + + // Check to see if we have a routing key + if (parameters.length == 2) + { + if (routingKeys == null) + { + routingKeys = new HashSet(); + } + //Add routing key to permitted publish destinations + routingKeys.add(parameters[1]); + } + + // Add the updated routingkey list or null if all values allowed + publish_exchanges.put(parameters[0], routingKeys); + } + break; + case PURGE: + break; + case UNBIND: + break; + } + + } + + public boolean authorise(Permission permission, Object... parameters) + { + + switch (permission) + { + case ACCESS: + return true; // This is here for completeness but the SimpleXML ACLManager never calls it. + // The existence of this user specific PP can be validated in the map SimpleXML maintains. + case BIND: // Parameters : QueueBindMethod , Exchange , AMQQueue, AMQShortString routingKey + +// QueueDeclareBody body = (QueueDeclareBody) parameters[0]; + + Exchange exchange = (Exchange) parameters[1]; + + if (exchange.getName().equals("<>")) + { + // Binding to <> can not be programmed via ACLs due to '<','>' unable to be used in the XML + System.err.println("Binding on exchange <> not alowed via ACLs"); + } + + AMQQueue bind_queueName = (AMQQueue) parameters[2]; + AMQShortString routingKey = (AMQShortString) parameters[3]; + + //Get all Create Rights for this user + Map bindCreateRights = (Map) _permissions.get(Permission.CREATE); + + //Look up the Queue Creation Rights + Map bind_create_queues = (Map) bindCreateRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues + Map bind_create_queues_queues = (Map) bindCreateRights.get(CREATE_QUEUE_QUEUES_KEY); + + // Check and see if we have a queue white list to check + if (bind_create_queues_queues != null) + { + //There a white list for queues + Map exchangeDetails = (Map) bind_create_queues_queues.get(bind_queueName); + + if (exchangeDetails == null) //Then all queue can be bound to all exchanges. + { + return true; + } + + // Check to see if we have a white list of routingkeys to check + Map rkeys = (Map) exchangeDetails.get(exchange.getName()); + + // if keys is null then any rkey is allowed on this exchange + if (rkeys == null) + { + // There is no routingkey white list + return true; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = rkeys.keySet().iterator(); + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + if (rkey.endsWith("*")) + { + matched = routingKey.startsWith(rkey.subSequence(0, rkey.length() - 1).toString()); + } + else + { + matched = routingKey.equals(rkey); + } + } + + + return matched; + } + + + } + else + { + //There a is no white list for queues + + // So can allow all queues to be bound + // but we should first check and see if we have a temp queue and validate that we are allowed + // to bind temp queues. + + //Check to see if we have a temporary queue + if (bind_queueName.isAutoDelete()) + { + // Check and see if we have an exchange white list. + Map bind_exchanges = (Map) bind_create_queues.get(CREATE_QUEUE_EXCHANGES_KEY); + + // If the exchange exists then we must check to see if temporary queues are allowed here + if (bind_exchanges != null) + { + // Check to see if the requested exchange is allowed. + Map exchangeDetails = (Map) bind_exchanges.get(exchange.getName()); + + return (Boolean) exchangeDetails.get(CREATE_QUEUE_EXCHANGES_TEMPORARY_KEY); + } + + //no white list so all allowed, drop through to return true below. + } + + // not a temporary queue and no white list so all allowed. + return true; + } + + case CREATE:// Paramters : QueueDeclareBody || ExchangeDeclareBody + + Map createRights = (Map) _permissions.get(permission); + + // If there are no create rights then deny request + if (createRights == null) + { + return false; + } + + if (parameters.length == 1) + { + if (parameters[0] instanceof QueueDeclareBody) + { + QueueDeclareBody body = (QueueDeclareBody) parameters[0]; + + //Look up the Queue Creation Rights + Map create_queues = (Map) createRights.get(CREATE_QUEUES_KEY); + + //Lookup the list of queues allowed to be created + Map create_queues_queues = (Map) create_queues.get(CREATE_QUEUE_QUEUES_KEY); + + + AMQShortString queueName = body.getQueue(); + + + if (body.getAutoDelete())// we have a temporary queue + { + return (Boolean) create_queues.get(CREATE_QUEUE_TEMPORARY_KEY); + } + else + { + // If there is a white list then check + return create_queues_queues == null || create_queues_queues.containsKey(queueName); + } + + } + else if (parameters[0] instanceof ExchangeDeclareBody) + { + ExchangeDeclareBody body = (ExchangeDeclareBody) parameters[0]; + + AMQShortString exchangeName = body.getExchange(); + + Map create_exchanges = (Map) createRights.get(CREATE_EXCHANGES_KEY); + + // If the exchange list is doesn't exist then all is allowed else check the valid exchanges + return create_exchanges == null || create_exchanges.containsKey(exchangeName); + } + } + break; + case CONSUME: // Parameters : AMQQueue + + if (parameters.length == 1 && parameters[0] instanceof AMQQueue) + { + AMQQueue queue = ((AMQQueue) parameters[0]); + Map queuePermissions = (Map) _permissions.get(permission); + + List queues = (List) queuePermissions.get(CONSUME_QUEUES_KEY); + + Boolean temporayQueues = (Boolean) queuePermissions.get(CONSUME_TEMPORARY_KEY); + Boolean ownQueuesOnly = (Boolean) queuePermissions.get(CONSUME_OWN_QUEUES_ONLY_KEY); + + // If user is allowed to publish to temporary queues and this is a temp queue then allow it. + if (temporayQueues) + { + if (queue.isAutoDelete()) + // This will allow consumption from any temporary queue including ones not owned by this user. + // Of course the exclusivity will not be broken. + { + // if not limited to ownQueuesOnly then ok else check queue Owner. + return !ownQueuesOnly || queue.getOwner().equals(_user); + } + else + { + return false; + } + } + + // if queues are white listed then ensure it is ok + if (queues != null) + { + // if no queues are listed then ALL are ok othereise it must be specified. + if (ownQueuesOnly) + { + if (queue.getOwner().equals(_user)) + { + return queues.size() == 0 || queues.contains(queue.getName()); + } + else + { + return false; + } + } + + // If we are + return queues.size() == 0 || queues.contains(queue.getName()); + } + } + + // Can't authenticate without the right parameters + return false; + case DELETE: + break; + + case PUBLISH: // Parameters : Exchange exchange, AMQShortString routingKey + Map publishRights = (Map) _permissions.get(permission); + + if (publishRights == null) + { + return false; + } + + Map exchanges = (Map) publishRights.get(PUBLISH_EXCHANGES_KEY); + + // Having no exchanges listed gives full publish rights to all exchanges + if (exchanges == null) + { + return true; + } + // Otherwise exchange must be listed in the white list + + // If the map doesn't have the exchange then it isn't allowed + if (!exchanges.containsKey(parameters[0])) + { + return false; + } + else + { + + // Get valid routing keys + HashSet routingKeys = (HashSet) exchanges.get(parameters[0]); + + // Having no routingKeys in the map then all are allowed. + if (routingKeys == null) + { + return true; + } + else + { + // We have routingKeys so a match must be found to allowed binding + Iterator keys = routingKeys.iterator(); + + + AMQShortString publishRKey = (AMQShortString)parameters[1]; + + boolean matched = false; + while (keys.hasNext() && !matched) + { + AMQShortString rkey = (AMQShortString) keys.next(); + + if (rkey.endsWith("*")) + { + matched = publishRKey.startsWith(rkey.subSequence(0, rkey.length() - 1)); + } + else + { + matched = publishRKey.equals(rkey); + } + } + return matched; + } + } + case PURGE: + break; + case UNBIND: + break; + + } + + return false; + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java deleted file mode 100644 index b8762aa43b..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/UserManagement.java +++ /dev/null @@ -1,118 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.access; - -import org.apache.qpid.server.management.MBeanOperation; -import org.apache.qpid.server.management.MBeanOperationParameter; -import org.apache.qpid.server.management.MBeanAttribute; -import org.apache.qpid.AMQException; - -import javax.management.openmbean.TabularData; -import javax.management.openmbean.CompositeData; -import javax.management.JMException; -import javax.management.MBeanOperationInfo; -import java.io.IOException; - -public interface UserManagement -{ - String TYPE = "UserManagement"; - - //********** Operations *****************// - /** - * set password for user - * - * @param username The username to create - * @param password The password for the user - * - * @return The result of the operation - */ - @MBeanOperation(name = "setPassword", description = "Set password for user.", - impact = MBeanOperationInfo.ACTION) - boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")char[] password); - - /** - * set rights for users with given details - * - * @param username The username to create - * @param read The set of permission to give the new user - * @param write The set of permission to give the new user - * @param admin The set of permission to give the new user - * - * @return The result of the operation - */ - @MBeanOperation(name = "setRights", description = "Set access rights for user.", - impact = MBeanOperationInfo.ACTION) - boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "readAndWrite", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); - - /** - * Create users with given details - * - * @param username The username to create - * @param password The password for the user - * @param read The set of permission to give the new user - * @param write The set of permission to give the new user - * @param admin The set of permission to give the new user - * - * @return The result of the operation - */ - @MBeanOperation(name = "createUser", description = "Create new user from system.", - impact = MBeanOperationInfo.ACTION) - boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, - @MBeanOperationParameter(name = "password", description = "Password")char[] password, - @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, - @MBeanOperationParameter(name = "readAndWrite", description = "Administration write")boolean write, - @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); - - /** - * View users returns all the users that are currently available to the system. - * - * @param username The user to delete - * - * @return The result of the operation - */ - @MBeanOperation(name = "deleteUser", description = "Delete user from system.", - impact = MBeanOperationInfo.ACTION) - boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username); - - - /** - * Reload the date from disk - * - * @return The result of the operation - */ - @MBeanOperation(name = "reloadData", description = "Reload the authentication file from disk.", - impact = MBeanOperationInfo.ACTION) - boolean reloadData(); - - /** - * View users returns all the users that are currently available to the system. - * - * @return a table of users data (Username, read, write, admin) - */ - @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.", - impact = MBeanOperationInfo.INFO) - TabularData viewUsers(); - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java new file mode 100644 index 0000000000..a8ae03cc5d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/AMQUserManagementMBean.java @@ -0,0 +1,468 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.management; + +import org.apache.qpid.server.management.MBeanDescription; +import org.apache.qpid.server.management.AMQManagedObject; +import org.apache.qpid.server.management.MBeanOperation; +import org.apache.qpid.server.management.MBeanInvocationHandlerImpl; +import org.apache.qpid.server.security.auth.database.PrincipalDatabase; +import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; +import org.apache.qpid.server.security.access.management.UserManagement; +import org.apache.log4j.Logger; +import org.apache.commons.configuration.ConfigurationException; + +import javax.management.JMException; +import javax.management.remote.JMXPrincipal; +import javax.management.openmbean.TabularData; +import javax.management.openmbean.TabularDataSupport; +import javax.management.openmbean.TabularType; +import javax.management.openmbean.SimpleType; +import javax.management.openmbean.CompositeType; +import javax.management.openmbean.OpenType; +import javax.management.openmbean.OpenDataException; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.security.auth.login.AccountNotFoundException; +import javax.security.auth.Subject; +import java.io.File; +import java.io.FileInputStream; +import java.io.IOException; +import java.io.FileOutputStream; +import java.util.Properties; +import java.util.List; +import java.util.Enumeration; +import java.util.Set; +import java.util.concurrent.locks.ReentrantLock; +import java.security.Principal; +import java.security.AccessControlContext; +import java.security.AccessController; + +/** MBean class for AMQUserManagementMBean. It implements all the management features exposed for managing users. */ +@MBeanDescription("User Management Interface") +public class AMQUserManagementMBean extends AMQManagedObject implements UserManagement +{ + + private static final Logger _logger = Logger.getLogger(AMQUserManagementMBean.class); + + private PrincipalDatabase _principalDatabase; + private String _accessFileName; + private Properties _accessRights; + // private File _accessFile; + private ReentrantLock _accessRightsUpdate = new ReentrantLock(); + + // Setup for the TabularType + static TabularType _userlistDataType; // Datatype for representing User Lists + + static CompositeType _userDataType; // Composite type for representing User + static String[] _userItemNames = {"Username", "read", "write", "admin"}; + + static + { + String[] userItemDesc = {"Broker Login username", "Management Console Read Permission", + "Management Console Write Permission", "Management Console Admin Permission"}; + + OpenType[] userItemTypes = new OpenType[4]; // User item types. + userItemTypes[0] = SimpleType.STRING; // For Username + userItemTypes[1] = SimpleType.BOOLEAN; // For Rights - Read + userItemTypes[2] = SimpleType.BOOLEAN; // For Rights - Write + userItemTypes[3] = SimpleType.BOOLEAN; // For Rights - Admin + String[] userDataIndex = {_userItemNames[0]}; + + try + { + _userDataType = + new CompositeType("User", "User Data", _userItemNames, userItemDesc, userItemTypes); + + _userlistDataType = new TabularType("Users", "List of users", _userDataType, userDataIndex); + } + catch (OpenDataException e) + { + _logger.error("Tabular data setup for viewing users incorrect."); + _userlistDataType = null; + } + } + + + public AMQUserManagementMBean() throws JMException + { + super(UserManagement.class, UserManagement.TYPE); + } + + public String getObjectInstanceName() + { + return UserManagement.TYPE; + } + + public boolean setPassword(String username, char[] password) + { + try + { + //delegate password changes to the Principal Database + return _principalDatabase.updatePassword(new UsernamePrincipal(username), password); + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to set password of non-existant user'" + username + "'"); + return false; + } + } + + public boolean setRights(String username, boolean read, boolean write, boolean admin) + { + + if (_accessRights.get(username) == null) + { + // If the user doesn't exist in the user rights file check that they at least have an account. + if (_principalDatabase.getUser(username) == null) + { + return false; + } + } + + try + { + + _accessRightsUpdate.lock(); + + // Update the access rights + if (admin) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.ADMIN); + } + else + { + if (read | write) + { + if (read) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READONLY); + } + if (write) + { + _accessRights.put(username, MBeanInvocationHandlerImpl.READWRITE); + } + } + else + { + _accessRights.remove(username); + } + } + + saveAccessFile(); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + + return true; + } + + public boolean createUser(String username, char[] password, boolean read, boolean write, boolean admin) + { + if (_principalDatabase.createPrincipal(new UsernamePrincipal(username), password)) + { + _accessRights.put(username, ""); + + return setRights(username, read, write, admin); + } + + return false; + } + + public boolean deleteUser(String username) + { + + try + { + if (_principalDatabase.deletePrincipal(new UsernamePrincipal(username))) + { + try + { + _accessRightsUpdate.lock(); + + _accessRights.remove(username); + saveAccessFile(); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + return true; + } + } + catch (AccountNotFoundException e) + { + _logger.warn("Attempt to delete user (" + username + ") that doesn't exist"); + } + + return false; + } + + public boolean reloadData() + { + try + { + try + { + loadAccessFile(); + } + catch (ConfigurationException e) + { + _logger.info("Reload failed due to:" + e); + return false; + } + + // Reload successful + return true; + } + catch (IOException e) + { + _logger.info("Reload failed due to:" + e); + // Reload unsuccessful + return false; + } + } + + + @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.") + public TabularData viewUsers() + { + // Table of users + // Username(string), Access rights Read,Write,Admin(bool,bool,bool) + + if (_userlistDataType == null) + { + _logger.warn("TabluarData not setup correctly"); + return null; + } + + List users = _principalDatabase.getUsers(); + + TabularDataSupport userList = new TabularDataSupport(_userlistDataType); + + try + { + // Create the tabular list of message header contents + for (Principal user : users) + { + // Create header attributes list + + String rights = (String) _accessRights.get(user.getName()); + + Boolean read = false; + Boolean write = false; + Boolean admin = false; + + if (rights != null) + { + read = rights.equals(MBeanInvocationHandlerImpl.READONLY) + || rights.equals(MBeanInvocationHandlerImpl.READWRITE); + write = rights.equals(MBeanInvocationHandlerImpl.READWRITE); + admin = rights.equals(MBeanInvocationHandlerImpl.ADMIN); + } + + Object[] itemData = {user.getName(), read, write, admin}; + CompositeData messageData = new CompositeDataSupport(_userDataType, _userItemNames, itemData); + userList.put(messageData); + } + } + catch (OpenDataException e) + { + _logger.warn("Unable to create user list due to :" + e); + return null; + } + + return userList; + } + + /*** Broker Methods **/ + + /** + * setPrincipalDatabase + * + * @param database set The Database to use for user lookup + */ + public void setPrincipalDatabase(PrincipalDatabase database) + { + _principalDatabase = database; + } + + /** + * setAccessFile + * + * @param accessFile the file to use for updating. + * + * @throws java.io.IOException If the file cannot be accessed + * @throws org.apache.commons.configuration.ConfigurationException + * if checks on the file fail. + */ + public void setAccessFile(String accessFile) throws IOException, ConfigurationException + { + _accessFileName = accessFile; + + if (_accessFileName != null) + { + loadAccessFile(); + } + else + { + _logger.warn("Access rights file specified is null. Access rights not changed."); + } + } + + private void loadAccessFile() throws IOException, ConfigurationException + { + try + { + _accessRightsUpdate.lock(); + + Properties accessRights = new Properties(); + + File accessFile = new File(_accessFileName); + + if (!accessFile.exists()) + { + throw new ConfigurationException("'" + _accessFileName + "' does not exist"); + } + + if (!accessFile.canRead()) + { + throw new ConfigurationException("Cannot read '" + _accessFileName + "'."); + } + + if (!accessFile.canWrite()) + { + _logger.warn("Unable to write to access file '" + _accessFileName + "' changes will not be preserved."); + } + + accessRights.load(new FileInputStream(accessFile)); + checkAccessRights(accessRights); + setAccessRights(accessRights); + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + } + + private void checkAccessRights(Properties accessRights) + { + Enumeration values = accessRights.propertyNames(); + + while (values.hasMoreElements()) + { + String user = (String) values.nextElement(); + + if (_principalDatabase.getUser(user) == null) + { + _logger.warn("Access rights contains user '" + user + "' but there is no authentication data for that user"); + } + } + } + + private void saveAccessFile() + { + try + { + _accessRightsUpdate.lock(); + try + { + // remove old temporary file + File tmp = new File(_accessFileName + ".tmp"); + if (tmp.exists()) + { + tmp.delete(); + } + + //remove old backup + File old = new File(_accessFileName + ".old"); + if (old.exists()) + { + old.delete(); + } + + // Rename current file + File rights = new File(_accessFileName); + rights.renameTo(old); + + FileOutputStream output = new FileOutputStream(tmp); + _accessRights.store(output, "Generated by AMQUserManagementMBean Console : Last edited by user:" + getCurrentJMXUser()); + output.close(); + + // Rename new file to main file + tmp.renameTo(rights); + + // delete tmp + tmp.delete(); + } + catch (IOException e) + { + _logger.warn("Problem occured saving '" + _accessFileName + "' changes may not be preserved. :" + e); + } + } + finally + { + if (_accessRightsUpdate.isHeldByCurrentThread()) + { + _accessRightsUpdate.unlock(); + } + } + } + + private String getCurrentJMXUser() + { + AccessControlContext acc = AccessController.getContext(); + Subject subject = Subject.getSubject(acc); + + // Retrieve JMXPrincipal from Subject + Set principals = subject.getPrincipals(JMXPrincipal.class); + if (principals == null || principals.isEmpty()) + { + return "Unknown user principals were null"; + } + + Principal principal = principals.iterator().next(); + return principal.getName(); + } + + /** + * user=read user=write user=readwrite user=admin + * + * @param accessRights The properties list of access rights to process + */ + private void setAccessRights(Properties accessRights) + { + _logger.debug("Setting Access Rights:" + accessRights); + _accessRights = accessRights; + MBeanInvocationHandlerImpl.setAccessRights(_accessRights); + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java new file mode 100644 index 0000000000..658d7ebbd3 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/management/UserManagement.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.management; + +import org.apache.qpid.server.management.MBeanOperation; +import org.apache.qpid.server.management.MBeanOperationParameter; +import org.apache.qpid.server.management.MBeanAttribute; +import org.apache.qpid.AMQException; + +import javax.management.openmbean.TabularData; +import javax.management.openmbean.CompositeData; +import javax.management.JMException; +import javax.management.MBeanOperationInfo; +import java.io.IOException; + +public interface UserManagement +{ + String TYPE = "UserManagement"; + + //********** Operations *****************// + /** + * set password for user + * + * @param username The username to create + * @param password The password for the user + * + * @return The result of the operation + */ + @MBeanOperation(name = "setPassword", description = "Set password for user.", + impact = MBeanOperationInfo.ACTION) + boolean setPassword(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")char[] password); + + /** + * set rights for users with given details + * + * @param username The username to create + * @param read The set of permission to give the new user + * @param write The set of permission to give the new user + * @param admin The set of permission to give the new user + * + * @return The result of the operation + */ + @MBeanOperation(name = "setRights", description = "Set access rights for user.", + impact = MBeanOperationInfo.ACTION) + boolean setRights(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "readAndWrite", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); + + /** + * Create users with given details + * + * @param username The username to create + * @param password The password for the user + * @param read The set of permission to give the new user + * @param write The set of permission to give the new user + * @param admin The set of permission to give the new user + * + * @return The result of the operation + */ + @MBeanOperation(name = "createUser", description = "Create new user from system.", + impact = MBeanOperationInfo.ACTION) + boolean createUser(@MBeanOperationParameter(name = "username", description = "Username")String username, + @MBeanOperationParameter(name = "password", description = "Password")char[] password, + @MBeanOperationParameter(name = "read", description = "Administration read")boolean read, + @MBeanOperationParameter(name = "readAndWrite", description = "Administration write")boolean write, + @MBeanOperationParameter(name = "admin", description = "Administration rights")boolean admin); + + /** + * View users returns all the users that are currently available to the system. + * + * @param username The user to delete + * + * @return The result of the operation + */ + @MBeanOperation(name = "deleteUser", description = "Delete user from system.", + impact = MBeanOperationInfo.ACTION) + boolean deleteUser(@MBeanOperationParameter(name = "username", description = "Username")String username); + + + /** + * Reload the date from disk + * + * @return The result of the operation + */ + @MBeanOperation(name = "reloadData", description = "Reload the authentication file from disk.", + impact = MBeanOperationInfo.ACTION) + boolean reloadData(); + + /** + * View users returns all the users that are currently available to the system. + * + * @return a table of users data (Username, read, write, admin) + */ + @MBeanOperation(name = "viewUsers", description = "All users with access rights to the system.", + impact = MBeanOperationInfo.INFO) + TabularData viewUsers(); + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java new file mode 100644 index 0000000000..a51061aa0d --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -0,0 +1,68 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.plugins; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Accessable; +import org.apache.qpid.server.security.access.Permission; +import org.apache.commons.configuration.Configuration; + +public class AllowAll implements ACLPlugin +{ + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) + { + if (ACLManager.getLogger().isInfoEnabled()) + { + ACLManager.getLogger().info("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters))); + } + + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + + public static String accessablesToString(Object[] accessObject) + { + StringBuilder sb = new StringBuilder(); + + for (Object access : accessObject) + { + sb.append(access.getClass().getSimpleName() + ":" + access.toString() + ", "); + } + + return sb.delete(sb.length() - 2, sb.length()).toString(); + } + + public String getPluginName() + { + return "AllowAll"; + } + + public void setConfiguaration(Configuration config) + { + //no-op + } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java new file mode 100644 index 0000000000..80c125e737 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/DenyAll.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ +package org.apache.qpid.server.security.access.plugins; + +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.AMQConnectionException; +import org.apache.commons.configuration.Configuration; + +public class DenyAll implements ACLPlugin +{ + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException + { + + if (ACLManager.getLogger().isInfoEnabled()) + { + } + ACLManager.getLogger().info("Denying user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters))); + + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, "DenyAll Plugin"); + } + + public String getPluginName() + { + return "DenyAll"; + } + + public void setConfiguaration(Configuration config) + { + //no-op + } +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java new file mode 100644 index 0000000000..c09cdc33f5 --- /dev/null +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java @@ -0,0 +1,431 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + * + */ + +package org.apache.qpid.server.security.access.plugins; + +import org.apache.commons.configuration.Configuration; +import org.apache.log4j.Logger; +import org.apache.qpid.AMQConnectionException; +import org.apache.qpid.framing.AMQMethodBody; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicConsumeBody; +import org.apache.qpid.framing.BasicPublishBody; + +import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.protocol.AMQProtocolSession; +import org.apache.qpid.server.security.access.ACLManager; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.AccessResult; +import org.apache.qpid.server.security.access.Permission; +import org.apache.qpid.server.security.access.PrincipalPermissions; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +/** + * This uses the default + */ +public class SimpleXML implements ACLPlugin +{ + private static final Logger _logger = ACLManager.getLogger(); + + private Map _users; + + public SimpleXML() + { + _users = new ConcurrentHashMap(); + } + + public void setConfiguaration(Configuration config) + { + _logger.info("SimpleXML Configuration"); + + processConfig(config); + } + + private void processConfig(Configuration config) + { + processPublish(config); + + processConsume(config); + + processCreate(config); + } + + /** + * Publish format takes + * Exchange + Routing Key Pairs + * + * @param config XML Configuration + */ + private void processPublish(Configuration config) + { + Configuration publishConfig = config.subset("security.access_control_list.publish"); + + //Process users that have full publish permission + String[] users = publishConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.PUBLISH, user); + _logger.info("PUBLISH:GRANTED:USER:" + user + " for all destinations"); + } + + // Process exchange limited users + int exchangeCount = 0; + Configuration exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + //Get Exchange Name + AMQShortString exchangeName = new AMQShortString(exchangeConfig.getString("name")); + + //Get Routing Keys + int keyCount = 0; + Configuration routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")"); + + while (!routingkeyConfig.isEmpty()) + { + //Get RoutingKey Value + AMQShortString routingKeyValue = new AMQShortString(routingkeyConfig.getString("value")); + + //Apply Exchange + RoutingKey permissions to Users + users = routingkeyConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.PUBLISH, user, exchangeName, routingKeyValue); + _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange '" + exchangeName + "' for key '" + routingKeyValue + "'"); + } + + //Apply permissions to Groups + + // Check for more configs + keyCount++; + routingkeyConfig = exchangeConfig.subset("routing_keys.routing_key(" + keyCount + ")"); + } + + //Apply Exchange wide permissions to Users + users = exchangeConfig.getStringArray("exchange(" + exchangeCount + ").users.user"); + + for (String user : users) + { + grant(Permission.PUBLISH, user, exchangeName); + _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange:" + exchangeName); + } + + //Apply permissions to Groups + exchangeCount++; + exchangeConfig = publishConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + } + + private void grant(Permission permission, String user, Object... parameters) + { + PrincipalPermissions permissions = _users.get(user); + + if (permissions == null) + { + permissions = new PrincipalPermissions(user); + } + + _users.put(user, permissions); + permissions.grant(permission, parameters); + } + + private void processConsume(Configuration config) + { + Configuration consumeConfig = config.subset("security.access_control_list.consume"); + + // Process queue limited users + int queueCount = 0; + Configuration queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")"); + + while (!queueConfig.isEmpty()) + { + //Get queue Name + AMQShortString queueName = new AMQShortString(queueConfig.getString("name")); + // if there is no name then there may be a temporary element + boolean temporary = queueConfig.containsKey("temporary"); + boolean ownQueues = queueConfig.containsKey("own_queues"); + + //Process permissions for this queue + String[] users = queueConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CONSUME, user, queueName, temporary, ownQueues); + if (temporary) + { + if (ownQueues) + { + _logger.info("CONSUME:GRANTED:USER:" + user + " on temporary queues owned by user."); + } + else + { + _logger.info("CONSUME:GRANTED:USER:" + user + " on all temporary queues."); + } + } + else + { + _logger.info("CONSUME:GRANTED:USER:" + user + " on queue '" + queueName + "'"); + } + } + + //See if we have another config + queueCount++; + queueConfig = consumeConfig.subset("queues.queue(" + queueCount + ")"); + } + + // Process users that have full consume permission + String[] users = consumeConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CONSUME, user); + _logger.info("CONSUME:GRANTED:USER:" + user + " from all queues."); + } + } + + private void processCreate(Configuration config) + { + Configuration createConfig = config.subset("security.access_control_list.create"); + + // Process create permissions for queue creation + int queueCount = 0; + Configuration queueConfig = createConfig.subset("queues.queue(" + queueCount + ")"); + + while (!queueConfig.isEmpty()) + { + //Get queue Name + AMQShortString queueName = new AMQShortString(queueConfig.getString("name")); + + // if there is no name then there may be a temporary element + boolean temporary = queueConfig.containsKey("temporary"); + + int exchangeCount = 0; + Configuration exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + + AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name")); + AMQShortString routingKey = new AMQShortString(exchangeConfig.getString("routing_key")); + + //Process permissions for this queue + String[] users = exchangeConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CREATE, user, temporary, + (queueName.equals("") ? null : queueName), + (exchange.equals("") ? null : exchange), + (routingKey.equals("") ? null : routingKey)); + + _logger.info("CREATE :GRANTED:USER:" + user + " for " + + (queueName.equals("") ? "" : "queue '" + queueName + "' ") + + (exchange.equals("") ? "" : "exchange '" + exchange + "' ") + + (routingKey.equals("") ? "" : " rk '" + routingKey + "' ") + + (temporary ? " temporary:" + temporary : "")); + } + + //See if we have another config + exchangeCount++; + exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + + // Process users that are not bound to an exchange + String[] users = queueConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CREATE, user, temporary, queueName); + if (temporary) + { + _logger.info("CREATE :GRANTED:USER:" + user + " from temporary queues on any exchange."); + } + else + { + _logger.info("CREATE :GRANTED:USER:" + user + " from queue '" + queueName + "' on any exchange."); + } + } + + //See if we have another config + queueCount++; + queueConfig = createConfig.subset("queues.queue(" + queueCount + ")"); + } + + // Process create permissions for exchange creation + int exchangeCount = 0; + Configuration exchangeConfig = createConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + + while (!exchangeConfig.isEmpty()) + { + AMQShortString exchange = new AMQShortString(exchangeConfig.getString("name")); + AMQShortString clazz = new AMQShortString(exchangeConfig.getString("class")); + + //Process permissions for this queue + String[] users = exchangeConfig.getStringArray("users.user"); + for (String user : users) + { + grant(Permission.CREATE, user, exchange, clazz); + _logger.info("CREATE:GRANTED:USER:" + user + " for exchange '" + exchange + ":class:'" + clazz); + } + + //See if we have another config + exchangeCount++; + exchangeConfig = queueConfig.subset("exchanges.exchange(" + exchangeCount + ")"); + } + + // Process users that have full create permission + String[] users = createConfig.getStringArray("users.user"); + + for (String user : users) + { + grant(Permission.CREATE, user); + _logger.info("CREATE:GRANTED:USER:" + user + " from all queues & exchanges."); + } + + + } + + public String getPluginName() + { + return "Simple"; + } + + public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) throws AMQConnectionException + { + String error = ""; + + if (ACLManager.getLogger().isInfoEnabled()) + { + ACLManager.getLogger().info("Simple Authorisation processing user:" + session.getAuthorizedID() + " for :" + permission.toString() + + " on " + body.getClass().getSimpleName() + + (parameters == null || parameters.length == 0 ? "" : "-" + AllowAll.accessablesToString(parameters))); + } + + String username = session.getAuthorizedID().getName(); + + //Get the Users Permissions + PrincipalPermissions permissions = _users.get(username); + + _logger.warn("Processing :" + permission + " for:" + username + ":" + permissions+":"+parameters.length); + + if (permissions != null) + { + switch (permission) + { + case ACCESS: + _logger.warn("GRANTED:"+permission); + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + case BIND: // Body QueueDeclareBody - Parameters : Exchange, Queue, QueueName + // Body QueueBindBody - Paramters : Exchange, Queue, QueueName + if (parameters.length == 3) + { + // Parameters : Exchange, Queue, RoutingKey + if (permissions.authorise(Permission.BIND, body, parameters[0], parameters[1], parameters[2])) + { + _logger.warn("GRANTED:"+permission); + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + } + break; + case CONSUME: // Parameters : none + if (parameters.length == 1 && permissions.authorise(Permission.CONSUME, parameters[0])) + { + _logger.warn("GRANTED:"+permission); + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + break; + case CREATE: // Body : QueueDeclareBody | ExchangeDeclareBody - Parameters : none + if (permissions.authorise(Permission.CREATE, body)) + { + _logger.warn("GRANTED:"+permission); + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + break; + case PUBLISH: // Body : BasicPublishBody Parameters : exchange + if (parameters.length == 1 && parameters[0] instanceof Exchange) + { + if (permissions.authorise(Permission.PUBLISH, ((Exchange) parameters[0]).getName(), + ((BasicPublishBody) body).getRoutingKey())) + { + _logger.warn("GRANTED:"+permission); + return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + } + } + break; + case PURGE: + break; + case DELETE: + break; + case UNBIND: + break; + } + } + + _logger.warn("Access Denied for :" + permission + " for:" + username + ":" + permissions); + //todo potential refactor this ConnectionException Out of here + throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error); + } + +//todo use or lose +// if (accessObject instanceof VirtualHost) +// { +// VirtualHostAccess[] hosts = lookupVirtualHost(user.getName()); +// +// if (hosts != null) +// { +// for (VirtualHostAccess host : hosts) +// { +// if (accessObject.getAccessableName().equals(host.getVirtualHost())) +// { +// if (host.getAccessRights().allows(rights)) +// { +// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); +// } +// else +// { +// return new AccessResult(this, AccessResult.AccessStatus.REFUSED); +// } +// } +// } +// } +// } +// else if (accessObject instanceof AMQQueue) +// { +// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost()); +// +// if (queues != null) +// { +// for (String queue : queues) +// { +// if (accessObject.getAccessableName().equals(queue)) +// { +// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); +// } +// } +// } +// } + +// return new AccessResult(this, AccessResult.AccessStatus.REFUSED); +// } + +} diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java index 10adfdd9fc..348bccb4e9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/Base64MD5PasswordFilePrincipalDatabase.java @@ -24,7 +24,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.security.auth.sasl.AuthenticationProviderInitialiser; import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; import org.apache.qpid.server.security.auth.sasl.crammd5.CRAMMD5HashedInitialiser; -import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.qpid.server.security.access.management.AMQUserManagementMBean; import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.EncoderException; @@ -45,7 +45,6 @@ import java.util.LinkedList; import java.util.concurrent.locks.ReentrantLock; import java.security.Principal; import java.security.NoSuchAlgorithmException; -import java.security.MessageDigest; /** * Represents a user database where the account information is stored in a simple flat file. diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index 2d3f5e5131..c417d7e244 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -37,7 +37,7 @@ import org.apache.qpid.configuration.PropertyException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.security.auth.database.PrincipalDatabase; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AMQUserManagementMBean; +import org.apache.qpid.server.security.access.management.AMQUserManagementMBean; import org.apache.qpid.AMQException; import javax.management.JMException; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java deleted file mode 100644 index 5c372f6c2c..0000000000 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PlainPasswordVhostFilePrincipalDatabase.java +++ /dev/null @@ -1,130 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - * - */ -package org.apache.qpid.server.security.auth.database; - -import org.apache.log4j.Logger; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessResult; -import org.apache.qpid.server.security.access.AccessRights; -import org.apache.qpid.server.security.access.Accessable; -import org.apache.qpid.server.security.auth.sasl.UsernamePrincipal; -import org.apache.qpid.server.virtualhost.VirtualHost; - -import java.io.BufferedReader; -import java.io.FileReader; -import java.io.IOException; -import java.security.Principal; - -/** - * Represents a user database where the account information is stored in a simple flat file. - * - * The file is expected to be in the form: username:password username1:password1 ... usernamen:passwordn - * - * where a carriage return separates each username/password pair. Passwords are assumed to be in plain text. - */ -public class PlainPasswordVhostFilePrincipalDatabase extends PlainPasswordFilePrincipalDatabase implements AccessManager -{ - private static final Logger _logger = Logger.getLogger(PlainPasswordVhostFilePrincipalDatabase.class); - - /** - * Looks up the virtual hosts for a specified user in the password file. - * - * @param user The user to lookup - * - * @return a list of virtualhosts - */ - private String[] lookupVirtualHost(String user) - { - try - { - BufferedReader reader = null; - try - { - reader = new BufferedReader(new FileReader(_passwordFile)); - String line; - - while ((line = reader.readLine()) != null) - { - if (!line.startsWith("#")) - { - String[] result = _regexp.split(line); - if (result == null || result.length < 3) - { - continue; - } - - if (user.equals(result[0])) - { - return result[2].split(","); - } - } - } - return null; - } - finally - { - if (reader != null) - { - reader.close(); - } - } - } - catch (IOException ioe) - { - //ignore - } - return null; - } - - - public AccessResult isAuthorized(Accessable accessObject, String username) - { - return isAuthorized(accessObject, new UsernamePrincipal(username), AccessRights.Rights.READ); - } - - public AccessResult isAuthorized(Accessable accessObject, Principal user, AccessRights.Rights rights) - { - - if (accessObject instanceof VirtualHost) - { - String[] hosts = lookupVirtualHost(user.getName()); - - if (hosts != null) - { - for (String host : hosts) - { - if (accessObject.getAccessableName().equals(host)) - { - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); - } - } - } - } - - return new AccessResult(this, AccessResult.AccessStatus.REFUSED); - } - - public String getName() - { - return "PlainPasswordVhostFile"; - } - -} diff --git a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java index e9fa642175..0acfa84f31 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java @@ -35,8 +35,8 @@ import org.apache.qpid.server.security.auth.manager.AuthenticationManager; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.security.auth.database.PrincipalDatabaseManager; import org.apache.qpid.server.security.auth.database.PropertiesPrincipalDatabaseManager; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AllowAll; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.plugins.AllowAll; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.server.virtualhost.VirtualHostRegistry; @@ -48,7 +48,7 @@ public class NullApplicationRegistry extends ApplicationRegistry private VirtualHostRegistry _virtualHostRegistry; - private AccessManager _accessManager; + private ACLPlugin _accessManager; private PrincipalDatabaseManager _databaseManager; @@ -116,7 +116,7 @@ public class NullApplicationRegistry extends ApplicationRegistry return _virtualHostRegistry; } - public AccessManager getAccessManager() + public ACLPlugin getAccessManager() { return _accessManager; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index afe96bcd4f..90004a028c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -26,8 +26,8 @@ import org.apache.commons.configuration.Configuration; import org.apache.commons.configuration.PropertiesConfiguration; import org.apache.log4j.Logger; import org.apache.qpid.server.AMQBrokerManagerMBean; -import org.apache.qpid.server.security.access.AccessManager; -import org.apache.qpid.server.security.access.AccessManagerImpl; +import org.apache.qpid.server.security.access.ACLPlugin; +import org.apache.qpid.server.security.access.ACLManager; import org.apache.qpid.server.security.access.Accessable; import org.apache.qpid.server.security.auth.manager.PrincipalDatabaseAuthenticationManager; import org.apache.qpid.server.security.auth.manager.AuthenticationManager; @@ -69,7 +69,7 @@ public class VirtualHost implements Accessable private AuthenticationManager _authenticationManager; - private AccessManager _accessManager; + private ACLPlugin _accessManager; private final Timer _houseKeepingTimer = new Timer("Queue-housekeeping", true); @@ -168,7 +168,7 @@ public class VirtualHost implements Accessable _authenticationManager = new PrincipalDatabaseAuthenticationManager(name, hostConfig); - _accessManager = new AccessManagerImpl(name, hostConfig); + _accessManager = ACLManager.loadACLManager(name, hostConfig); _brokerMBean = new AMQBrokerManagerMBean(_virtualHostMBean); _brokerMBean.register(); @@ -279,7 +279,7 @@ public class VirtualHost implements Accessable return _authenticationManager; } - public AccessManager getAccessManager() + public ACLPlugin getAccessManager() { return _accessManager; } -- cgit v1.2.1 From 5a7d2f803b1a182559456c53d094ca00d5523503 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 14:50:04 +0000 Subject: QPID-809 Updated logging and removed an unnecessary printStackTrace(). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630865 13f79535-47bb-0310-9956-ffa450edef68 --- .../apache/qpid/server/handler/BasicPublishMethodHandler.java | 11 ++++------- .../auth/manager/PrincipalDatabaseAuthenticationManager.java | 8 -------- 2 files changed, 4 insertions(+), 15 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java index b0ac6c81f6..0f99a21ee5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicPublishMethodHandler.java @@ -35,9 +35,9 @@ import org.apache.qpid.server.state.AMQStateManager; import org.apache.qpid.server.state.StateAwareMethodListener; import org.apache.qpid.server.virtualhost.VirtualHost; -public class BasicPublishMethodHandler implements StateAwareMethodListener +public class BasicPublishMethodHandler implements StateAwareMethodListener { - private static final Logger _log = Logger.getLogger(BasicPublishMethodHandler.class); + private static final Logger _logger = Logger.getLogger(BasicPublishMethodHandler.class); private static final BasicPublishMethodHandler _instance = new BasicPublishMethodHandler(); @@ -54,12 +54,9 @@ public class BasicPublishMethodHandler implements StateAwareMethodListener Date: Mon, 25 Feb 2008 15:16:01 +0000 Subject: QPID-809 Updated logging and some whitespace changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630872 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/handler/QueueDeclareHandler.java | 13 +++++++------ .../registry/ConfigurationFileApplicationRegistry.java | 9 +++++---- 2 files changed, 12 insertions(+), 10 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java index b2797138c2..89a56b9a3c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/QueueDeclareHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -26,6 +26,7 @@ import java.util.UUID; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.configuration.Configured; + import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.configuration.Configurator; @@ -45,7 +46,7 @@ import org.apache.commons.configuration.Configuration; public class QueueDeclareHandler implements StateAwareMethodListener { - private static final Logger _log = Logger.getLogger(QueueDeclareHandler.class); + private static final Logger _logger = Logger.getLogger(QueueDeclareHandler.class); private static final QueueDeclareHandler _instance = new QueueDeclareHandler(); @@ -109,7 +110,7 @@ public class QueueDeclareHandler implements StateAwareMethodListener _virtualHosts = new ConcurrentHashMap(); private PluginManager _pluginManager; @@ -121,7 +122,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _managedObjectRegistry.start(); _pluginManager = new PluginManager(_configuration.getString("plugin-directory")); - + initialiseVirtualHosts(); } @@ -178,7 +179,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry { return getConfiguration().getList("virtualhosts.virtualhost.name"); } - + public PluginManager getPluginManager() { return _pluginManager; -- cgit v1.2.1 From 82d700652b4a55a24c99fa5649c13533425bbc19 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 25 Feb 2008 15:18:23 +0000 Subject: QPID-809 Updated logging and some whitespace changes. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@630873 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/handler/BasicConsumeMethodHandler.java | 23 +++++++++++----------- 1 file changed, 11 insertions(+), 12 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java index bacf34f871..7cd4afdb77 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicConsumeMethodHandler.java @@ -7,9 +7,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY @@ -24,7 +24,6 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQException; import org.apache.qpid.framing.*; import org.apache.qpid.protocol.AMQConstant; -import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.server.AMQChannel; import org.apache.qpid.server.ConsumerTagNotUniqueException; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -36,7 +35,7 @@ import org.apache.qpid.server.virtualhost.VirtualHost; public class BasicConsumeMethodHandler implements StateAwareMethodListener { - private static final Logger _log = Logger.getLogger(BasicConsumeMethodHandler.class); + private static final Logger _logger = Logger.getLogger(BasicConsumeMethodHandler.class); private static final BasicConsumeMethodHandler _instance = new BasicConsumeMethodHandler(); @@ -66,21 +65,21 @@ public class BasicConsumeMethodHandler implements StateAwareMethodListener Date: Thu, 28 Feb 2008 12:05:16 +0000 Subject: QPID-818 : Persistent Pub/Sub can get exception on acking message git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@631936 13f79535-47bb-0310-9956-ffa450edef68 --- .../broker/src/main/java/org/apache/qpid/server/ack/TxAck.java | 9 +-------- .../java/org/apache/qpid/server/ack/UnacknowledgedMessage.java | 10 ---------- .../src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 4 ---- 3 files changed, 1 insertion(+), 22 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index ac29998c2a..c62a7880a8 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -100,10 +100,9 @@ public class TxAck implements TxnOp //make persistent changes, i.e. dequeue and decrementReference for (UnacknowledgedMessage msg : _unacked) { - //msg.restoreTransientMessageData(); - //Message has been ack so discard it. This will dequeue and decrement the reference. msg.discard(storeContext); + } } @@ -115,7 +114,6 @@ public class TxAck implements TxnOp //in memory (persistent changes will be rolled back by store) for (UnacknowledgedMessage msg : _unacked) { - msg.clearTransientMessageData(); msg.getMessage().takeReference(); } } @@ -124,11 +122,6 @@ public class TxAck implements TxnOp { //remove the unacked messages from the channels map _map.remove(_unacked); - for (UnacknowledgedMessage msg : _unacked) - { - msg.clearTransientMessageData(); - } - } public void rollback(StoreContext storeContext) diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java index 40f5970cac..df7cecc940 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/UnacknowledgedMessage.java @@ -68,16 +68,6 @@ public class UnacknowledgedMessage entry.getMessage().decrementReference(storeContext); } - public void restoreTransientMessageData() throws AMQException - { - entry.getMessage().restoreTransientMessageData(); - } - - public void clearTransientMessageData() - { - entry.getMessage().clearTransientMessageData(); - } - public AMQMessage getMessage() { return entry.getMessage(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index 5e79ab46b0..dcc2becbc5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -707,10 +707,6 @@ public class AMQMessage _transientMessageData = transientMessageData; } - public void clearTransientMessageData() - { - _transientMessageData = null; - } public String toString() { -- cgit v1.2.1 From af4f34d70b44b9fc8ebbb8230706f9e7b2a0ec38 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 28 Feb 2008 16:51:12 +0000 Subject: QPID-821 Set default to false for MultiIO and Buffer Limiting. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632054 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/transport/ConnectorConfiguration.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java index 1dcbb02d5c..23aaf56876 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ConnectorConfiguration.java @@ -94,7 +94,7 @@ public class ConnectorConfiguration public String certType; @Configured(path = "connector.qpidnio", - defaultValue = "true") + defaultValue = "false") public boolean _multiThreadNIO; -- cgit v1.2.1 From 4ef2f5237550c996de8b7a0c8bed039719685756 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Thu, 28 Feb 2008 16:58:43 +0000 Subject: QPID-822 Set default to false for JMSXUserID. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@632055 13f79535-47bb-0310-9956-ffa450edef68 --- java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 3cb50d1d12..5542fbc9b6 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -118,7 +118,7 @@ public class AMQChannel private boolean _closing; @Configured(path = "advanced.enableJMSXUserID", - defaultValue = "true") + defaultValue = "false") public boolean ENABLE_JMSXUserID; -- cgit v1.2.1 From 19c9b4d979b80251fdd5517b8070e93cd6ad7b23 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 3 Mar 2008 20:13:55 +0000 Subject: QPID-828 : Stop transient message data being cleared while still delivering git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@633256 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/protocol/AMQMinaProtocolSession.java | 6 +++--- .../src/main/java/org/apache/qpid/server/queue/AMQMessage.java | 2 -- 2 files changed, 3 insertions(+), 5 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 6f40594cb4..ab503289ba 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -138,11 +138,9 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable catch (RuntimeException e) { e.printStackTrace(); - // throw e; + throw e; } - - // this(session, queueRegistry, exchangeRegistry, codecFactory, new AMQStateManager()); } public AMQMinaProtocolSession(IoSession session, VirtualHostRegistry virtualHostRegistry, AMQCodecFactory codecFactory, @@ -364,6 +362,8 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable listener.error(e); } + _logger.error("Unexpected exception while processing frame. Closing connection.", e); + _minaProtocolSession.close(); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java index dcc2becbc5..f501bc27d1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java @@ -657,8 +657,6 @@ public class AMQMessage // now that it has all been received, before we attempt delivery _txnContext.messageFullyReceived(isPersistent()); - _transientMessageData = null; - for (AMQQueue q : destinationQueues) { // Increment the references to this message for each queue delivery. -- cgit v1.2.1 From 70f17f743d8011d56fcb6fa6d8ef164c381c26a3 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Tue, 4 Mar 2008 13:01:54 +0000 Subject: QPID-831 : Remove incorrect references to getDeliveredToConsumer() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@633461 13f79535-47bb-0310-9956-ffa450edef68 --- .../queue/ConcurrentSelectorDeliveryManager.java | 24 ++++------------------ 1 file changed, 4 insertions(+), 20 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java index a61d41e33b..7dfcae95c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/ConcurrentSelectorDeliveryManager.java @@ -313,13 +313,11 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager { QueueEntry entry = currentQueue.next(); - if (!entry.getDeliveredToConsumer()) + if (subscription.hasInterest(entry)) { - if (subscription.hasInterest(entry)) - { - subscription.enqueueForPreDelivery(entry, false); - } + subscription.enqueueForPreDelivery(entry, false); } + } } @@ -509,9 +507,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager while (purgeMessage(entry, sub, purgeOnly)) { AMQMessage message = entry.getMessage(); - // if we are purging then ensure we mark this message taken for the current subscriber - // the current subscriber may be null in the case of a get or a purge but this is ok. -// boolean alreadyTaken = message.taken(_queue, sub); //remove the already taken message or expired QueueEntry removed = messages.poll(); @@ -519,7 +514,7 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager assert removed == entry; // if the message expired then the _totalMessageSize needs adjusting - if (message.expired(_queue) && !entry.getDeliveredToConsumer()) + if (message.expired(_queue) && !entry.taken(sub)) { _totalMessageSize.addAndGet(-entry.getSize()); @@ -867,17 +862,6 @@ public class ConcurrentSelectorDeliveryManager implements DeliveryManager for (Subscription sub : _subscriptions.getSubscriptions()) { - // stop if the message gets delivered whilst PreDelivering if we have a shared queue. - if (_queue.isShared() && entry.getDeliveredToConsumer()) - { - if (debugEnabled) - { - _log.debug(debugIdentity() + "Stopping PreDelivery as message(" + System.identityHashCode(entry) + - ") is already delivered."); - } - continue; - } - // Only give the message to those that want them. if (sub.hasInterest(entry)) { -- cgit v1.2.1 From 26ac1df0128425f904a3acbf4200de3442b20734 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 10 Mar 2008 13:04:21 +0000 Subject: QPID-847 : SelectorParserTest with disabled testLike() git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@635549 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/SelectorParserTest.java | 130 +++++++++++++++++++++ 1 file changed, 130 insertions(+) create mode 100644 java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java (limited to 'java/broker/src') diff --git a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java new file mode 100644 index 0000000000..dcac291ea1 --- /dev/null +++ b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java @@ -0,0 +1,130 @@ +package org.apache.qpid.server; + +import junit.framework.TestCase; +import org.apache.qpid.server.filter.JMSSelectorFilter; +import org.apache.qpid.AMQException;/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +public class SelectorParserTest extends TestCase +{ + public void testSelectorWithHyphen() + { + testPass("Cost = 2 AND \"property-with-hyphen\" = 'wibble'"); + } + + public void testLike() + { + //FAILING QPID-847 + System.err.println("testLike disabled"); + System.out.println("testLike disabled"); +// testPass("Cost LIKE 2"); + } + + public void testStringQuoted() + { + testPass("string = 'Test'"); + } + + public void testProperty() + { + testPass("prop1 = prop2"); + } + + public void testPropertyNames() + { + testPass("$min= TRUE AND _max= FALSE AND Prop_2 = true AND prop$3 = false"); + } + + public void testProtected() + { + testFail("NULL = 0 "); + testFail("TRUE = 0 "); + testFail("FALSE = 0 "); + testFail("NOT = 0 "); + testFail("AND = 0 "); + testFail("OR = 0 "); + testFail("BETWEEN = 0 "); + testFail("LIKE = 0 "); + testFail("IN = 0 "); + testFail("IS = 0 "); + testFail("ESCAPE = 0 "); + } + + + public void testBoolean() + { + testPass("min= TRUE AND max= FALSE "); + testPass("min= true AND max= false"); + } + + public void testDouble() + { + testPass("positive=31E2 AND negative=-31.4E3"); + testPass("min=" + Double.MIN_VALUE + " AND max=" + Double.MAX_VALUE); + } + + public void testLong() + { + testPass("minLong=" + Long.MIN_VALUE + "L AND maxLong=" + Long.MAX_VALUE + "L"); + } + + public void testInt() + { + testPass("minInt=" + Integer.MIN_VALUE + " AND maxInt=" + Integer.MAX_VALUE); + } + + public void testSigned() + { + testPass("negative=-42 AND positive=+42"); + } + + public void testOctal() + { + testPass("octal=042"); + } + + + private void testPass(String selector) + { + try + { + new JMSSelectorFilter(selector); + } + catch (AMQException e) + { + fail("Selector '" + selector + "' was not parsed :" + e.getMessage()); + } + } + + private void testFail(String selector) + { + try + { + new JMSSelectorFilter(selector); + fail("Selector '" + selector + "' was parsed "); + } + catch (AMQException e) + { + //normal path + } + } + +} -- cgit v1.2.1 From b8c0eb840e710f8763f765b839663fe76269fcca Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 10 Mar 2008 16:27:07 +0000 Subject: QIPD-841 : Problem with SimpleACLTest was that the ConfigurationFilePrincipalDatabaseManager was looking up the ApplicationRegistry for the configuration. Most of the time this is ok but during test tearDown/startUp sometimes the config is null on lookup. This method is just wrong anyway. the ConfigurationFilePrincipalDatabaseManager setup is the only setup that looksup the configuration from the AppRegistry. So adjusted so that it takes it as a parameter as other classes do. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@635590 13f79535-47bb-0310-9956-ffa450edef68 --- .../registry/ConfigurationFileApplicationRegistry.java | 6 +----- .../ConfigurationFilePrincipalDatabaseManager.java | 15 +++++++-------- 2 files changed, 8 insertions(+), 13 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java index e0fcaa208d..fef958000a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/registry/ConfigurationFileApplicationRegistry.java @@ -58,10 +58,6 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry private VirtualHostRegistry _virtualHostRegistry; - - //fixme Why is this not used. - private final Map _virtualHosts = new ConcurrentHashMap(); - private PluginManager _pluginManager; @@ -113,7 +109,7 @@ public class ConfigurationFileApplicationRegistry extends ApplicationRegistry _accessManager = ACLManager.loadACLManager("default", _configuration); - _databaseManager = new ConfigurationFilePrincipalDatabaseManager(); + _databaseManager = new ConfigurationFilePrincipalDatabaseManager(_configuration); _authenticationManager = new PrincipalDatabaseAuthenticationManager(null, null); diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java index c417d7e244..15c62a62e4 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/ConfigurationFilePrincipalDatabaseManager.java @@ -50,17 +50,16 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab Map _databases; - public ConfigurationFilePrincipalDatabaseManager() throws Exception + public ConfigurationFilePrincipalDatabaseManager(Configuration configuration) throws Exception { _logger.info("Initialising PrincipleDatabase authentication manager"); - _databases = initialisePrincipalDatabases(); + _databases = initialisePrincipalDatabases(configuration); } - private Map initialisePrincipalDatabases() throws Exception + private Map initialisePrincipalDatabases(Configuration configuration) throws Exception { - Configuration config = ApplicationRegistry.getInstance().getConfiguration(); - List databaseNames = config.getList(_base + ".name"); - List databaseClasses = config.getList(_base + ".class"); + List databaseNames = configuration.getList(_base + ".name"); + List databaseClasses = configuration.getList(_base + ".class"); Map databases = new HashMap(); if (databaseNames.size() == 0) @@ -85,7 +84,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab throw new Exception("Principal databases must implement the PrincipalDatabase interface"); } - initialisePrincipalDatabase((PrincipalDatabase) o, config, i); + initialisePrincipalDatabase((PrincipalDatabase) o, configuration, i); String name = databaseNames.get(i); if ((name == null) || (name.length() == 0)) @@ -200,7 +199,7 @@ public class ConfigurationFilePrincipalDatabaseManager implements PrincipalDatab } String jmxaccesssFile = null; - + try { jmxaccesssFile = PropertyUtils.replaceProperties(jmxaccesslist.get(0)); -- cgit v1.2.1 From 6aa10cc1aeb0ffbc6b02bf662b93eab879c517d7 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 10 Mar 2008 17:16:09 +0000 Subject: QPID-107 : Changes based on code review. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@635602 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/handler/ExchangeDeclareHandler.java | 7 +- .../qpid/server/handler/QueueDeclareHandler.java | 13 ++- .../server/protocol/AMQMinaProtocolSession.java | 6 +- .../security/access/PrincipalPermissions.java | 28 ++---- .../server/security/access/plugins/SimpleXML.java | 101 ++------------------- 5 files changed, 30 insertions(+), 125 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java index bc4476e969..9a98bc9659 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/ExchangeDeclareHandler.java @@ -59,8 +59,11 @@ public class ExchangeDeclareHandler implements StateAwareMethodListener>")) - { - // Binding to <> can not be programmed via ACLs due to '<','>' unable to be used in the XML - System.err.println("Binding on exchange <> not alowed via ACLs"); - } - AMQQueue bind_queueName = (AMQQueue) parameters[2]; AMQShortString routingKey = (AMQShortString) parameters[3]; diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java index c09cdc33f5..251f4e6330 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/SimpleXML.java @@ -46,9 +46,8 @@ import java.util.concurrent.ConcurrentHashMap; */ public class SimpleXML implements ACLPlugin { - private static final Logger _logger = ACLManager.getLogger(); - private Map _users; + private final AccessResult GRANTED = new AccessResult(this, AccessResult.AccessStatus.GRANTED); public SimpleXML() { @@ -57,8 +56,6 @@ public class SimpleXML implements ACLPlugin public void setConfiguaration(Configuration config) { - _logger.info("SimpleXML Configuration"); - processConfig(config); } @@ -87,7 +84,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.PUBLISH, user); - _logger.info("PUBLISH:GRANTED:USER:" + user + " for all destinations"); } // Process exchange limited users @@ -113,7 +109,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.PUBLISH, user, exchangeName, routingKeyValue); - _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange '" + exchangeName + "' for key '" + routingKeyValue + "'"); } //Apply permissions to Groups @@ -129,7 +124,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.PUBLISH, user, exchangeName); - _logger.info("PUBLISH:GRANTED:USER:" + user + " on Exchange:" + exchangeName); } //Apply permissions to Groups @@ -172,21 +166,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.CONSUME, user, queueName, temporary, ownQueues); - if (temporary) - { - if (ownQueues) - { - _logger.info("CONSUME:GRANTED:USER:" + user + " on temporary queues owned by user."); - } - else - { - _logger.info("CONSUME:GRANTED:USER:" + user + " on all temporary queues."); - } - } - else - { - _logger.info("CONSUME:GRANTED:USER:" + user + " on queue '" + queueName + "'"); - } } //See if we have another config @@ -200,7 +179,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.CONSUME, user); - _logger.info("CONSUME:GRANTED:USER:" + user + " from all queues."); } } @@ -237,12 +215,6 @@ public class SimpleXML implements ACLPlugin (queueName.equals("") ? null : queueName), (exchange.equals("") ? null : exchange), (routingKey.equals("") ? null : routingKey)); - - _logger.info("CREATE :GRANTED:USER:" + user + " for " - + (queueName.equals("") ? "" : "queue '" + queueName + "' ") - + (exchange.equals("") ? "" : "exchange '" + exchange + "' ") - + (routingKey.equals("") ? "" : " rk '" + routingKey + "' ") - + (temporary ? " temporary:" + temporary : "")); } //See if we have another config @@ -256,14 +228,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.CREATE, user, temporary, queueName); - if (temporary) - { - _logger.info("CREATE :GRANTED:USER:" + user + " from temporary queues on any exchange."); - } - else - { - _logger.info("CREATE :GRANTED:USER:" + user + " from queue '" + queueName + "' on any exchange."); - } } //See if we have another config @@ -285,7 +249,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.CREATE, user, exchange, clazz); - _logger.info("CREATE:GRANTED:USER:" + user + " for exchange '" + exchange + ":class:'" + clazz); } //See if we have another config @@ -299,7 +262,6 @@ public class SimpleXML implements ACLPlugin for (String user : users) { grant(Permission.CREATE, user); - _logger.info("CREATE:GRANTED:USER:" + user + " from all queues & exchanges."); } @@ -326,15 +288,12 @@ public class SimpleXML implements ACLPlugin //Get the Users Permissions PrincipalPermissions permissions = _users.get(username); - _logger.warn("Processing :" + permission + " for:" + username + ":" + permissions+":"+parameters.length); - if (permissions != null) { switch (permission) { case ACCESS: - _logger.warn("GRANTED:"+permission); - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + return GRANTED; case BIND: // Body QueueDeclareBody - Parameters : Exchange, Queue, QueueName // Body QueueBindBody - Paramters : Exchange, Queue, QueueName if (parameters.length == 3) @@ -342,23 +301,20 @@ public class SimpleXML implements ACLPlugin // Parameters : Exchange, Queue, RoutingKey if (permissions.authorise(Permission.BIND, body, parameters[0], parameters[1], parameters[2])) { - _logger.warn("GRANTED:"+permission); - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + return GRANTED; } } break; case CONSUME: // Parameters : none if (parameters.length == 1 && permissions.authorise(Permission.CONSUME, parameters[0])) { - _logger.warn("GRANTED:"+permission); - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + return GRANTED; } break; case CREATE: // Body : QueueDeclareBody | ExchangeDeclareBody - Parameters : none if (permissions.authorise(Permission.CREATE, body)) { - _logger.warn("GRANTED:"+permission); - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + return GRANTED; } break; case PUBLISH: // Body : BasicPublishBody Parameters : exchange @@ -367,8 +323,7 @@ public class SimpleXML implements ACLPlugin if (permissions.authorise(Permission.PUBLISH, ((Exchange) parameters[0]).getName(), ((BasicPublishBody) body).getRoutingKey())) { - _logger.warn("GRANTED:"+permission); - return new AccessResult(this, AccessResult.AccessStatus.GRANTED); + return GRANTED; } } break; @@ -381,51 +336,7 @@ public class SimpleXML implements ACLPlugin } } - _logger.warn("Access Denied for :" + permission + " for:" + username + ":" + permissions); //todo potential refactor this ConnectionException Out of here throw body.getConnectionException(AMQConstant.ACCESS_REFUSED, error); } - -//todo use or lose -// if (accessObject instanceof VirtualHost) -// { -// VirtualHostAccess[] hosts = lookupVirtualHost(user.getName()); -// -// if (hosts != null) -// { -// for (VirtualHostAccess host : hosts) -// { -// if (accessObject.getAccessableName().equals(host.getVirtualHost())) -// { -// if (host.getAccessRights().allows(rights)) -// { -// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); -// } -// else -// { -// return new AccessResult(this, AccessResult.AccessStatus.REFUSED); -// } -// } -// } -// } -// } -// else if (accessObject instanceof AMQQueue) -// { -// String[] queues = lookupQueue(username, ((AMQQueue) accessObject).getVirtualHost()); -// -// if (queues != null) -// { -// for (String queue : queues) -// { -// if (accessObject.getAccessableName().equals(queue)) -// { -// return new AccessResult(this, AccessResult.AccessStatus.GRANTED); -// } -// } -// } -// } - -// return new AccessResult(this, AccessResult.AccessStatus.REFUSED); -// } - } -- cgit v1.2.1 From 20cf766ec6465c52c56984780256791d97f481ac Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 10:46:40 +0000 Subject: QPID-592 : Parameterised the Read/Write buffer limits. On the broker extra config [read|write]BufferLimitSize on the client System properties qpid.[read|write].buffer.limit. All the defaults are 256k(262144). git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637047 13f79535-47bb-0310-9956-ffa450edef68 --- .../server/protocol/AMQPFastProtocolHandler.java | 23 ++++++++++++---------- 1 file changed, 13 insertions(+), 10 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java index db5d882f51..d8dbf97e49 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQPFastProtocolHandler.java @@ -29,10 +29,8 @@ import org.apache.mina.common.IoSession; import org.apache.mina.filter.ReadThrottleFilterBuilder; import org.apache.mina.filter.SSLFilter; import org.apache.mina.filter.WriteBufferLimitFilterBuilder; -import org.apache.mina.filter.codec.ProtocolCodecFilter; import org.apache.mina.filter.codec.QpidProtocolCodecFilter; import org.apache.mina.filter.executor.ExecutorFilter; -import org.apache.mina.transport.socket.nio.SocketSessionConfig; import org.apache.mina.util.SessionUtil; import org.apache.qpid.AMQException; import org.apache.qpid.codec.AMQCodecFactory; @@ -58,6 +56,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter private final IApplicationRegistry _applicationRegistry; + private static String DEFAULT_BUFFER_READ_LIMIT_SIZE = "262144"; + private static String DEFAULT_BUFFER_WRITE_LIMIT_SIZE = "262144"; + + private final int BUFFER_READ_LIMIT_SIZE; + private final int BUFFER_WRITE_LIMIT_SIZE; public AMQPFastProtocolHandler(Integer applicationRegistryInstance) { @@ -67,6 +70,11 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter public AMQPFastProtocolHandler(IApplicationRegistry applicationRegistry) { _applicationRegistry = applicationRegistry; + + // Read the configuration from the application registry + BUFFER_READ_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.readBufferLimitSize", DEFAULT_BUFFER_READ_LIMIT_SIZE)); + BUFFER_WRITE_LIMIT_SIZE = Integer.parseInt(_applicationRegistry.getConfiguration().getString("broker.connector.protectio.writeBufferLimitSize", DEFAULT_BUFFER_WRITE_LIMIT_SIZE)); + _logger.debug("AMQPFastProtocolHandler created"); } @@ -115,27 +123,22 @@ public class AMQPFastProtocolHandler extends IoHandlerAdapter } - if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio", false)) + if (ApplicationRegistry.getInstance().getConfiguration().getBoolean("broker.connector.protectio.enabled", false)) { try { // //Add IO Protection Filters IoFilterChain chain = protocolSession.getFilterChain(); - int buf_size = 32768; - if (protocolSession.getConfig() instanceof SocketSessionConfig) - { - buf_size = ((SocketSessionConfig) protocolSession.getConfig()).getReceiveBufferSize(); - } protocolSession.getFilterChain().addLast("tempExecutorFilterForFilterBuilder", new ExecutorFilter()); ReadThrottleFilterBuilder readfilter = new ReadThrottleFilterBuilder(); - readfilter.setMaximumConnectionBufferSize(buf_size); + readfilter.setMaximumConnectionBufferSize(BUFFER_READ_LIMIT_SIZE); readfilter.attach(chain); WriteBufferLimitFilterBuilder writefilter = new WriteBufferLimitFilterBuilder(); - writefilter.setMaximumConnectionBufferSize(buf_size * 2); + writefilter.setMaximumConnectionBufferSize(BUFFER_WRITE_LIMIT_SIZE); writefilter.attach(chain); protocolSession.getFilterChain().remove("tempExecutorFilterForFilterBuilder"); -- cgit v1.2.1 From c57e80ac3fe600ee2969b00c1d81bfd2cfe0cbd0 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 10:51:40 +0000 Subject: QPID-851 : Update to AMQChannel to prevent the memory leak when an autoclose consumer closes in a second thread before the register gets a chance to add the new session to the map. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637048 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 26 +++++++++++++++------- 1 file changed, 18 insertions(+), 8 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 5542fbc9b6..b04f60b1b0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -33,7 +33,7 @@ import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.ack.UnacknowledgedMessage; import org.apache.qpid.server.ack.UnacknowledgedMessageMap; import org.apache.qpid.server.ack.UnacknowledgedMessageMapImpl; -import org.apache.qpid.server.exchange.MessageRouter; +import org.apache.qpid.server.configuration.Configurator; import org.apache.qpid.server.exchange.NoRouteException; import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.protocol.AMQProtocolSession; @@ -43,17 +43,15 @@ import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.txn.LocalTransactionalContext; import org.apache.qpid.server.txn.NonTransactionalContext; import org.apache.qpid.server.txn.TransactionalContext; -import org.apache.qpid.server.configuration.Configurator; import java.util.Collection; -import java.util.HashMap; import java.util.HashSet; import java.util.LinkedList; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; public class AMQChannel { @@ -91,7 +89,7 @@ public class AMQChannel private AMQMessage _currentMessage; /** Maps from consumer tag to queue instance. Allows us to unsubscribe from a queue. */ - private final Map _consumerTag2QueueMap = new HashMap(); + private final Map _consumerTag2QueueMap = new ConcurrentHashMap(); private final MessageStore _messageStore; @@ -333,9 +331,21 @@ public class AMQChannel throw new ConsumerTagNotUniqueException(); } - queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + // We add before we register as the Async Delivery process may AutoClose the subscriber + // so calling _cT2QM.remove before we have done put which was after the register succeeded. + // So to keep things straight we put before the call and catch all exceptions from the register and tidy up. _consumerTag2QueueMap.put(tag, queue); + try + { + queue.registerProtocolSession(session, _channelId, tag, acks, filters, noLocal, exclusive); + } + catch (AMQException e) + { + _consumerTag2QueueMap.remove(tag); + throw e; + } + return tag; } @@ -822,7 +832,7 @@ public class AMQChannel { message.discard(_storeContext); message.setQueueDeleted(true); - + } catch (AMQException e) { @@ -967,7 +977,7 @@ public class AMQChannel public void processReturns(AMQProtocolSession session) throws AMQException { - if(!_returnMessages.isEmpty()) + if (!_returnMessages.isEmpty()) { for (RequiredDeliveryException bouncedMessage : _returnMessages) { -- cgit v1.2.1 From 9db506849ee57d0669f5df47d8a84c18e20dfb1d Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 11:36:42 +0000 Subject: QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637066 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/queue/AMQQueue.java | 2 ++ .../org/apache/qpid/server/queue/Subscription.java | 4 +-- .../apache/qpid/server/queue/SubscriptionImpl.java | 34 +++++++++++++++++----- 3 files changed, 31 insertions(+), 9 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 4a0121700c..7c6db0b4b3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -700,6 +700,8 @@ public class AMQQueue implements Managable, Comparable { _subscribers.setExclusive(true); } + + subscription.start(); } private boolean isExclusive() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java index a706098b71..96ce6743ec 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java @@ -45,8 +45,6 @@ public interface Subscription void enqueueForPreDelivery(QueueEntry msg, boolean deliverFirst); - boolean isAutoClose(); - void close(); boolean isClosed(); @@ -60,4 +58,6 @@ public interface Subscription Object getSendLock(); AMQChannel getChannel(); + + void start(); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java index 6e68b5637e..bde3ad8ec9 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java @@ -461,7 +461,7 @@ public class SubscriptionImpl implements Subscription } } - public boolean isAutoClose() + private boolean isAutoClose() { return _autoClose; } @@ -523,19 +523,24 @@ public class SubscriptionImpl implements Subscription { _logger.info("Closing autoclose subscription (" + debugIdentity() + "):" + this); - ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); - converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); - _sentClose = true; - - //fixme JIRA do this better + boolean unregisteredOK = false; try { - channel.unsubscribeConsumer(protocolSession, consumerTag); + unregisteredOK = channel.unsubscribeConsumer(protocolSession, consumerTag); } catch (AMQException e) { // Occurs if we cannot find the subscriber in the channel with protocolSession and consumerTag. + _logger.info("Unable to UnsubscribeConsumer :" + consumerTag +" so not going to send CancelOK."); } + + if (unregisteredOK) + { + ProtocolOutputConverter converter = protocolSession.getProtocolOutputConverter(); + converter.confirmConsumerAutoClose(channel.getChannelId(), consumerTag); + _sentClose = true; + } + } } @@ -666,4 +671,19 @@ public class SubscriptionImpl implements Subscription return channel; } + public void start() + { + //Check to see if we need to autoclose + if (filtersMessages()) + { + if (isAutoClose()) + { + if (_messages.isEmpty()) + { + autoclose(); + } + } + } + } + } -- cgit v1.2.1 From 81ff3589fdcd37de37c7bbe60b14dd4d1cddaf2a Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 14 Mar 2008 11:39:20 +0000 Subject: QPID-852 : Updated broker so that it closes consumers when there are no messages on the queue git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637067 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/AMQChannel.java | 11 ++++++++++- 1 file changed, 10 insertions(+), 1 deletion(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index b04f60b1b0..17b4fa5d65 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -349,7 +349,14 @@ public class AMQChannel return tag; } - public void unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException + /** + * Unsubscribe a consumer from a queue. + * @param session + * @param consumerTag + * @return true if the consumerTag had a mapped queue that could be unregistered. + * @throws AMQException + */ + public boolean unsubscribeConsumer(AMQProtocolSession session, AMQShortString consumerTag) throws AMQException { if (_log.isDebugEnabled()) { @@ -374,7 +381,9 @@ public class AMQChannel if (q != null) { q.unregisterProtocolSession(session, _channelId, consumerTag); + return true; } + return false; } /** -- cgit v1.2.1 From 9be19ae547c592691ac4f3f085d8ffcdc23c4390 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:22:23 +0000 Subject: QPID-847 : InvalidSelectorException. This was caused by the Broker now being more AMQP spec compliant than previously. Reverted the change in AMQMinaProtocolSession.java that is causing the issue but we need to correctly fix this issue in the client as the client is not AMQP spec compliant, even with the STRICT_AMQP flag. Updated SelectorTest.java with an additional test so we don't have the functionality reversion later. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637977 13f79535-47bb-0310-9956-ffa450edef68 --- .../qpid/server/protocol/AMQMinaProtocolSession.java | 14 +++++++++++--- 1 file changed, 11 insertions(+), 3 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java index 4c65a4682a..4267642b14 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/protocol/AMQMinaProtocolSession.java @@ -227,7 +227,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable return; } } - + try @@ -356,7 +356,7 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable } catch (Exception e) { - + for (AMQMethodListener listener : _frameListeners) { listener.error(e); @@ -548,7 +548,15 @@ public class AMQMinaProtocolSession implements AMQProtocolSession, Managable public void closeChannelOk(int channelId) { - removeChannel(channelId); + // todo QPID-847 - This is called from two lcoations ChannelCloseHandler and ChannelCloseOkHandler. + // When it is the CC_OK_Handler then it makes sence to remove the channel else we will leak memory. + // We do it from the Close Handler as we are sending the OK back to the client. + // While this is AMQP spec compliant. The Java client in the event of an IllegalArgumentException + // will send a close-ok.. Where we should call removeChannel. + // However, due to the poor exception handling on the client. The client-user will be notified of the + // InvalidArgument and if they then decide to close the session/connection then the there will be time + // for that to occur i.e. a new close method be sent before the exeption handling can mark the session closed. + //removeChannel(channelId); _closingChannelsList.remove(new Integer(channelId)); } -- cgit v1.2.1 From 5fd6a45806455ab7a83660e209e0aabeb66ce47c Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 17 Mar 2008 17:46:54 +0000 Subject: QPID-858 : Added cancel to the housekeeping thread when the vhost is closed. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@637991 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java | 4 ++++ 1 file changed, 4 insertions(+) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java index 90004a028c..3ff9b8c356 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java +++ b/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHost.java @@ -286,6 +286,10 @@ public class VirtualHost implements Accessable public void close() throws Exception { + if (_houseKeepingTimer != null) + { + _houseKeepingTimer.cancel(); + } if (_messageStore != null) { _messageStore.close(); -- cgit v1.2.1 From 366119ef91daf5400e15eccb29841641c680b98a Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Wed, 19 Mar 2008 17:43:37 +0000 Subject: QPID-847 : Fixed this test for the selectors git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@638951 13f79535-47bb-0310-9956-ffa450edef68 --- .../src/test/java/org/apache/qpid/server/SelectorParserTest.java | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java index dcac291ea1..a0304a7b01 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/SelectorParserTest.java @@ -31,11 +31,9 @@ public class SelectorParserTest extends TestCase } public void testLike() - { - //FAILING QPID-847 - System.err.println("testLike disabled"); - System.out.println("testLike disabled"); -// testPass("Cost LIKE 2"); + { + testFail("Cost LIKE 2"); + testPass("Cost LIKE 'Hello'"); } public void testStringQuoted() -- cgit v1.2.1 From 30c764cf4a22961f169bd5f6716acee85c49c431 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 24 Mar 2008 13:49:06 +0000 Subject: QPID-873 : Authentication Exception should be hard error; also NPE in PropertiesPrincipalDatabase when user not known git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@640417 13f79535-47bb-0310-9956-ffa450edef68 --- .../security/auth/database/PropertiesPrincipalDatabase.java | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java index 73d58ca489..c8a4add0f1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/auth/database/PropertiesPrincipalDatabase.java @@ -69,10 +69,14 @@ public class PropertiesPrincipalDatabase implements PrincipalDatabase { throw new IllegalArgumentException("principal must not be null"); } - char[] pwd = _users.getProperty(principal.getName()).toCharArray(); + + + + final String pwd = _users.getProperty(principal.getName()); + if (pwd != null) { - callback.setPassword(pwd); + callback.setPassword(pwd.toCharArray()); } else { -- cgit v1.2.1 From 5a414ecfb8ac75c9dee76ce9797c20b5a5034851 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Mon, 24 Mar 2008 15:03:00 +0000 Subject: QPID-875 : Dropped logging level to debug. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/M2.1@640434 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/security/access/plugins/AllowAll.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java index a51061aa0d..9b784069dd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java +++ b/java/broker/src/main/java/org/apache/qpid/server/security/access/plugins/AllowAll.java @@ -33,9 +33,9 @@ public class AllowAll implements ACLPlugin { public AccessResult authorise(AMQProtocolSession session, Permission permission, AMQMethodBody body, Object... parameters) { - if (ACLManager.getLogger().isInfoEnabled()) + if (ACLManager.getLogger().isDebugEnabled()) { - ACLManager.getLogger().info("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + ACLManager.getLogger().debug("Allowing user:" + session.getAuthorizedID() + " for :" + permission.toString() + " on " + body.getClass().getSimpleName() + (parameters == null || parameters.length == 0 ? "" : "-" + accessablesToString(parameters))); } -- cgit v1.2.1