From 7e6f4149a73c4347475caa362f50e4e97d697e2d Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Fri, 21 Feb 2014 01:15:30 +0000 Subject: QPID-5567 : Move acl checks into the objects being created git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1570411 13f79535-47bb-0310-9956-ffa450edef68 --- .../org/apache/qpid/server/binding/Binding.java | 4 + .../org/apache/qpid/server/consumer/Consumer.java | 4 +- .../qpid/server/exchange/AbstractExchange.java | 6 +- .../server/exchange/DefaultExchangeFactory.java | 2 - .../apache/qpid/server/queue/OutOfOrderQueue.java | 1 - .../apache/qpid/server/queue/QueueConsumer.java | 436 +------------- .../qpid/server/queue/QueueConsumerImpl.java | 458 +++++++++++++++ .../apache/qpid/server/queue/SimpleAMQQueue.java | 120 ++-- .../org/apache/qpid/server/queue/SortedQueue.java | 1 - .../apache/qpid/server/queue/SubFlushRunner.java | 4 +- .../qpid/server/security/SecurityManager.java | 54 +- .../server/security/access/ObjectProperties.java | 11 +- .../server/virtualhost/AbstractVirtualHost.java | 9 - .../qpid/server/exchange/HeadersBindingTest.java | 16 +- .../logging/subjects/BindingLogSubjectTest.java | 9 +- .../logging/subjects/QueueLogSubjectTest.java | 9 +- .../qpid/server/queue/AMQQueueFactoryTest.java | 3 + .../qpid/server/queue/ConflationQueueListTest.java | 7 +- .../apache/qpid/server/queue/ConsumerListTest.java | 9 +- .../org/apache/qpid/server/queue/MockAMQQueue.java | 644 --------------------- .../qpid/server/queue/PriorityQueueListTest.java | 6 +- .../qpid/server/queue/QueueEntryImplTestBase.java | 16 +- .../server/queue/SimpleQueueEntryImplTest.java | 6 +- .../server/queue/SortedQueueEntryListTest.java | 6 +- .../qpid/server/queue/SortedQueueEntryTest.java | 6 +- .../server/queue/StandardQueueEntryListTest.java | 16 +- .../AbstractDurableConfigurationStoreTestCase.java | 5 + .../qpid/server/txn/AutoCommitTransactionTest.java | 30 +- .../qpid/server/txn/LocalTransactionTest.java | 64 +- 29 files changed, 718 insertions(+), 1244 deletions(-) create mode 100644 qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java delete mode 100644 qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java (limited to 'qpid/java/broker-core') diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java index 469a4bb9d0..1e3bafa39e 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/Binding.java @@ -48,6 +48,10 @@ public class Binding _queue = queue; _exchange = exchange; _arguments = arguments == null ? Collections.EMPTY_MAP : Collections.unmodifiableMap(arguments); + + //Perform ACLs + queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + } public UUID getId() diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java index e23eb397e1..410a0ba2af 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java @@ -21,6 +21,8 @@ package org.apache.qpid.server.consumer; import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; public interface Consumer @@ -48,7 +50,7 @@ public interface Consumer AMQSessionModel getSessionModel(); - void setNoLocal(boolean noLocal); + MessageSource getMessageSource(); long getId(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index cb5902d234..a6aad93b27 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -119,6 +119,9 @@ public abstract class AbstractExchange implements Exchange _id = id; _logSubject = new ExchangeLogSubject(this, this.getVirtualHost()); + // check ACL + host.getSecurityManager().authoriseCreateExchange(this); + // Log Exchange creation CurrentActor.get().message(ExchangeMessages.CREATED(getType().getType(), name, durable)); } @@ -624,9 +627,6 @@ public abstract class AbstractExchange implements Exchange arguments = Collections.emptyMap(); } - //Perform ACLs - _virtualHost.getSecurityManager().authoriseBind(AbstractExchange.this, queue, bindingKey); - if (id == null) { id = UUIDGenerator.generateBindingUUID(getName(), diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java index 21586c6a4a..80aa4fa49c 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultExchangeFactory.java @@ -116,8 +116,6 @@ public class DefaultExchangeFactory implements ExchangeFactory public Exchange createExchange(UUID id, String exchange, String type, boolean durable, boolean autoDelete) throws AMQUnknownExchangeType { - // Check access - _host.getSecurityManager().authoriseCreateExchange(autoDelete, durable, exchange, null, null, null, type); ExchangeType exchType = _exchangeClassMap.get(type); if (exchType == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java index 20e7edda92..28d926686f 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/OutOfOrderQueue.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.virtualhost.VirtualHost; import java.util.Map; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 45dc556732..f7654d63fa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -20,441 +20,41 @@ */ package org.apache.qpid.server.queue; -import org.apache.log4j.Logger; -import org.apache.qpid.server.filter.FilterManager; -import org.apache.qpid.server.logging.LogActor; -import org.apache.qpid.server.logging.LogSubject; -import org.apache.qpid.server.logging.actors.CurrentActor; -import org.apache.qpid.server.logging.actors.GenericActor; -import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.logging.subjects.QueueLogSubject; -import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; -import org.apache.qpid.server.util.StateChangeListener; - -import java.text.MessageFormat; -import java.util.EnumMap; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.Lock; -import java.util.concurrent.locks.ReentrantLock; - -import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; +import org.apache.qpid.server.message.MessageInstance; -class QueueConsumer, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements Consumer +interface QueueConsumer, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> extends Consumer { - public static enum State - { - ACTIVE, - SUSPENDED, - CLOSED - } - - private static final Logger _logger = Logger.getLogger(QueueConsumer.class); - private final AtomicBoolean _targetClosed = new AtomicBoolean(false); - private final AtomicBoolean _closed = new AtomicBoolean(false); - private final long _id; - private final Lock _stateChangeLock = new ReentrantLock(); - private final long _createTime = System.currentTimeMillis(); - private final MessageInstance.ConsumerAcquiredState> _owningState = new MessageInstance.ConsumerAcquiredState>(this); - private final boolean _acquires; - private final boolean _seesRequeues; - private final String _consumerName; - private final boolean _isTransient; - private final AtomicLong _deliveredCount = new AtomicLong(0); - private final AtomicLong _deliveredBytes = new AtomicLong(0); - private final FilterManager _filters; - private final Class _messageClass; - private final Object _sessionReference; - private Q _queue; - private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) - + "(UNKNOWN)" - + "] "); - - static final EnumMap STATE_MAP = - new EnumMap(ConsumerTarget.State.class); - - static - { - STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE); - STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED); - STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED); - } - - private final T _target; - private final SubFlushRunner _runner = new SubFlushRunner(this); - private volatile QueueContext _queueContext; - private StateChangeListener, State> _stateListener = new StateChangeListener, State>() - { - public void stateChanged(QueueConsumer sub, State oldState, State newState) - { - CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); - } - }; - private boolean _noLocal; - - QueueConsumer(final FilterManager filters, - final Class messageClass, - final boolean acquires, - final boolean seesRequeues, - final String consumerName, - final boolean isTransient, - T target) - { - _messageClass = messageClass; - _sessionReference = target.getSessionModel().getConnectionReference(); - _id = SUB_ID_GENERATOR.getAndIncrement(); - _filters = filters; - _acquires = acquires; - _seesRequeues = seesRequeues; - _consumerName = consumerName; - _isTransient = isTransient; - _target = target; - _target.setStateListener( - new StateChangeListener() - { - @Override - public void stateChanged(final ConsumerTarget object, - final ConsumerTarget.State oldState, - final ConsumerTarget.State newState) - { - targetStateChanged(oldState, newState); - } - }); - } - - private void targetStateChanged(final ConsumerTarget.State oldState, final ConsumerTarget.State newState) - { - if(oldState != newState) - { - if(newState == ConsumerTarget.State.CLOSED) - { - if(_targetClosed.compareAndSet(false,true)) - { - CurrentActor.get().message(getLogSubject(), SubscriptionMessages.CLOSE()); - } - } - else - { - CurrentActor.get().message(getLogSubject(),SubscriptionMessages.STATE(newState.toString())); - } - } - - if(newState == ConsumerTarget.State.CLOSED && oldState != newState && !_closed.get()) - { - close(); - } - final StateChangeListener, State> stateListener = getStateListener(); - if(stateListener != null) - { - stateListener.stateChanged(this, STATE_MAP.get(oldState), STATE_MAP.get(newState)); - } - } - - public T getTarget() - { - return _target; - } - - @Override - public void externalStateChange() - { - getQueue().deliverAsync(this); - } - - @Override - public long getUnacknowledgedBytes() - { - return _target.getUnacknowledgedBytes(); - } - - @Override - public long getUnacknowledgedMessages() - { - return _target.getUnacknowledgedMessages(); - } - - @Override - public AMQSessionModel getSessionModel() - { - return _target.getSessionModel(); - } - - @Override - public boolean isSuspended() - { - return _target.isSuspended(); - } - - @Override - public void close() - { - if(_closed.compareAndSet(false,true)) - { - getSendLock(); - try - { - _target.close(); - _target.consumerRemoved(this); - _queue.unregisterConsumer(this); - } - finally - { - releaseSendLock(); - } - - } - } - - void flushBatched() - { - _target.flushBatched(); - } - - void queueDeleted() - { - _target.queueDeleted(); - } - - boolean wouldSuspend(final MessageInstance msg) - { - return !_target.allocateCredit(msg.getMessage()); - } - - void restoreCredit(final MessageInstance queueEntry) - { - _target.restoreCredit(queueEntry.getMessage()); - } - - void queueEmpty() - { - _target.queueEmpty(); - } - - State getState() - { - return STATE_MAP.get(_target.getState()); - } - - public final Q getQueue() - { - return _queue; - } - - final void setQueue(Q queue, boolean exclusive) - { - if(getQueue() != null) - { - throw new IllegalStateException("Attempt to set queue for consumer " + this + " to " + queue + "when already set to " + getQueue()); - } - _queue = queue; - - String queueString = new QueueLogSubject(_queue).toLogString(); - - _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) - + "(" - // queueString is [vh(/{0})/qu({1}) ] so need to trim - // ^ ^^ - + queueString.substring(1,queueString.length() - 3) - + ")" - + "] "); - - - if (CurrentActor.get().getRootMessageLogger().isMessageEnabled(_logActor, _logActor.getLogSubject(), SubscriptionMessages.CREATE_LOG_HIERARCHY)) - { - final String filterLogString = getFilterLogString(); - CurrentActor.get().message(_logActor.getLogSubject(), SubscriptionMessages.CREATE(filterLogString, queue.isDurable() && exclusive, - filterLogString.length() > 0)); - } - } - - protected final LogSubject getLogSubject() - { - return _logActor.getLogSubject(); - } - - final LogActor getLogActor() - { - return _logActor; - } - - - @Override - public final void flush() - { - getQueue().flushConsumer(this); - } - - boolean resend(final E entry) - { - return getQueue().resend(entry, this); - } - - final SubFlushRunner getRunner() - { - return _runner; - } - - public final long getId() - { - return _id; - } - - public final StateChangeListener, State> getStateListener() - { - return _stateListener; - } + void flushBatched(); - public final void setStateListener(StateChangeListener, State> listener) - { - _stateListener = listener; - } + void queueEmpty(); - final QueueContext getQueueContext() - { - return _queueContext; - } + boolean hasInterest(E node); - final void setQueueContext(QueueContext queueContext) - { - _queueContext = queueContext; - } + boolean wouldSuspend(E entry); - public final boolean isActive() - { - return getState() == State.ACTIVE; - } + void restoreCredit(E entry); - public final boolean isClosed() - { - return getState() == State.CLOSED; - } + void send(E entry, boolean batch); - public final void setNoLocal(boolean noLocal) - { - _noLocal = noLocal; - } + void queueDeleted(); - public final boolean hasInterest(E entry) - { - //check that the message hasn't been rejected - if (entry.isRejectedBy(this)) - { + SubFlushRunner getRunner(); - return false; - } - - if (entry.getMessage().getClass() == _messageClass) - { - if(_noLocal) - { - Object connectionRef = entry.getMessage().getConnectionReference(); - if (connectionRef != null && connectionRef == _sessionReference) - { - return false; - } - } - } - else - { - // no interest in messages we can't convert - if(_messageClass != null && MessageConverterRegistry.getConverter(entry.getMessage().getClass(), - _messageClass)==null) - { - return false; - } - } - return (_filters == null) || _filters.allAllow(entry.asFilterable()); - } - - protected String getFilterLogString() - { - StringBuilder filterLogString = new StringBuilder(); - String delimiter = ", "; - boolean hasEntries = false; - if (_filters != null && _filters.hasFilters()) - { - filterLogString.append(_filters.toString()); - hasEntries = true; - } + Q getQueue(); - if (!acquires()) - { - if (hasEntries) - { - filterLogString.append(delimiter); - } - filterLogString.append("Browser"); - } + boolean resend(E e); - return filterLogString.toString(); - } - - public final boolean trySendLock() - { - return _stateChangeLock.tryLock(); - } - - public final void getSendLock() - { - _stateChangeLock.lock(); - } - - public final void releaseSendLock() - { - _stateChangeLock.unlock(); - } - - public final long getCreateTime() - { - return _createTime; - } - - final MessageInstance.ConsumerAcquiredState> getOwningState() - { - return _owningState; - } - - public final boolean acquires() - { - return _acquires; - } - - public final boolean seesRequeues() - { - return _seesRequeues; - } - - public final String getName() - { - return _consumerName; - } - - public final boolean isTransient() - { - return _isTransient; - } - - public final long getBytesOut() + public static enum State { - return _deliveredBytes.longValue(); + ACTIVE, + SUSPENDED, + CLOSED } - public final long getMessagesOut() - { - return _deliveredCount.longValue(); - } + MessageInstance.ConsumerAcquiredState> getOwningState(); - final void send(final E entry, final boolean batch) - { - _deliveredCount.incrementAndGet(); - ServerMessage message = entry.getMessage(); - _deliveredBytes.addAndGet(message.getSize()); - _target.send(entry, batch); - } + QueueContext getQueueContext(); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java new file mode 100644 index 0000000000..eeeab76656 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumerImpl.java @@ -0,0 +1,458 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.queue; + +import org.apache.log4j.Logger; +import org.apache.qpid.server.filter.FilterManager; +import org.apache.qpid.server.logging.LogActor; +import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.logging.actors.CurrentActor; +import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.SubscriptionMessages; +import org.apache.qpid.server.logging.subjects.QueueLogSubject; +import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.MessageConverterRegistry; +import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.util.StateChangeListener; + +import java.text.MessageFormat; +import java.util.EnumMap; +import java.util.EnumSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.Lock; +import java.util.concurrent.locks.ReentrantLock; + +import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; + +class QueueConsumerImpl, Q extends SimpleAMQQueue, L extends SimpleQueueEntryList> implements QueueConsumer +{ + + + private static final Logger _logger = Logger.getLogger(QueueConsumerImpl.class); + private final AtomicBoolean _targetClosed = new AtomicBoolean(false); + private final AtomicBoolean _closed = new AtomicBoolean(false); + private final long _id; + private final Lock _stateChangeLock = new ReentrantLock(); + private final long _createTime = System.currentTimeMillis(); + private final MessageInstance.ConsumerAcquiredState> _owningState = new MessageInstance.ConsumerAcquiredState>(this); + private final boolean _acquires; + private final boolean _seesRequeues; + private final String _consumerName; + private final boolean _isTransient; + private final AtomicLong _deliveredCount = new AtomicLong(0); + private final AtomicLong _deliveredBytes = new AtomicLong(0); + private final FilterManager _filters; + private final Class _messageClass; + private final Object _sessionReference; + private final Q _queue; + private GenericActor _logActor = new GenericActor("[" + MessageFormat.format(SUBSCRIPTION_FORMAT, getId()) + + "(UNKNOWN)" + + "] "); + + static final EnumMap STATE_MAP = + new EnumMap(ConsumerTarget.State.class); + + static + { + STATE_MAP.put(ConsumerTarget.State.ACTIVE, State.ACTIVE); + STATE_MAP.put(ConsumerTarget.State.SUSPENDED, State.SUSPENDED); + STATE_MAP.put(ConsumerTarget.State.CLOSED, State.CLOSED); + } + + private final T _target; + private final SubFlushRunner _runner = new SubFlushRunner(this); + private volatile QueueContext _queueContext; + private StateChangeListener, State> _stateListener = new StateChangeListener, State>() + { + public void stateChanged(QueueConsumerImpl sub, State oldState, State newState) + { + CurrentActor.get().message(SubscriptionMessages.STATE(newState.toString())); + } + }; + private final boolean _noLocal; + + QueueConsumerImpl(final Q queue, + T target, final String consumerName, + final FilterManager filters, + final Class messageClass, + EnumSet