diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-03-08 18:56:42 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-03-08 18:56:42 +0000 |
| commit | b2023145c2b88ee458429663536cbab7ddd8f3b0 (patch) | |
| tree | 259f7ed1e2372025c7a65338abc3a58ef6b88e74 /qpid/java/broker-core/src | |
| parent | 19b2671cbd4af77ac52c222605c09b06cab7ced6 (diff) | |
| download | qpid-python-b2023145c2b88ee458429663536cbab7ddd8f3b0.tar.gz | |
QPID-5617 : [Java Broker] restore or implement child added/removed notifications for configured objects within the vhost
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1575591 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
31 files changed, 318 insertions, 133 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 9fdae72188..0bba5c7967 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -94,6 +94,7 @@ public class BindingImpl //Perform ACLs queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + _queue.bindingCreated(this); _logSubject = new BindingLogSubject(_bindingKey,exchange,queue); getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 4cfcb01cf0..b15b01ede5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; -public interface Consumer +public interface ConsumerImpl { AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 8f6b008f9b..faf5a724f3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -36,9 +36,9 @@ public interface ConsumerTarget State getState(); - void consumerAdded(Consumer sub); + void consumerAdded(ConsumerImpl sub); - void consumerRemoved(Consumer sub); + void consumerRemoved(ConsumerImpl sub); void setStateListener(StateChangeListener<ConsumerTarget, State> listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 043fa4a664..d0b1670a45 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -30,9 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -132,8 +131,8 @@ public class FilterSupport public boolean matches(Filterable message) { - final Collection<? extends Consumer> consumers = _queue.getConsumers(); - for(Consumer c : consumers) + final Collection<? extends ConsumerImpl> consumers = _queue.getConsumers(); + for(ConsumerImpl c : consumers) { if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 1913f11ae1..1a70f80eab 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 8c69f47367..4ee47e05e9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -21,10 +21,9 @@ package org.apache.qpid.server.message; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -50,23 +49,23 @@ public interface MessageInstance boolean acquiredByConsumer(); - boolean isAcquiredBy(Consumer consumer); + boolean isAcquiredBy(ConsumerImpl consumer); void setRedelivered(); boolean isRedelivered(); - Consumer getDeliveredConsumer(); + ConsumerImpl getDeliveredConsumer(); void reject(); - boolean isRejectedBy(Consumer consumer); + boolean isRejectedBy(ConsumerImpl consumer); boolean getDeliveredToConsumer(); boolean expired(); - boolean acquire(Consumer sub); + boolean acquire(ConsumerImpl sub); int getMaximumDeliveryCount(); @@ -160,7 +159,7 @@ public interface MessageInstance } } - public final class ConsumerAcquiredState<C extends Consumer> extends EntryState + public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState { private final C _consumer; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index 382a9df7e9..7f6629e33d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -31,13 +31,13 @@ import java.util.EnumSet; public interface MessageSource extends TransactionLogResource, MessageNode { - Consumer addConsumer(ConsumerTarget target, FilterManager filters, + ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters, Class<? extends ServerMessage> messageClass, - String consumerName, EnumSet<Consumer.Option> options) + String consumerName, EnumSet<ConsumerImpl.Option> options) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused; - Collection<? extends Consumer> getConsumers(); + Collection<? extends ConsumerImpl> getConsumers(); void addConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener); @@ -47,8 +47,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode interface ConsumerRegistrationListener<Q extends MessageSource> { - void consumerAdded(Q source, Consumer consumer); - void consumerRemoved(Q queue, Consumer consumer); + void consumerAdded(Q source, ConsumerImpl consumer); + void consumerRemoved(Q queue, ConsumerImpl consumer); } /** diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java new file mode 100644 index 0000000000..7ca5c6da8f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.Consumer; + +public interface MessageSourceConsumer<X extends MessageSourceConsumer<X>> extends ConsumerImpl, Consumer<MessageSourceConsumer<X>> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 89d5622121..0da93fa784 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -29,9 +29,11 @@ import org.apache.qpid.server.model.*; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.stats.StatisticsGatherer; -final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter> +final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>, + SessionModelListener { private AMQConnectionModel _connection; @@ -61,6 +63,7 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { super(Collections.<String,Object>emptyMap(), createAttributes(conn), taskExecutor); _connection = conn; + conn.addSessionListener(this); } private static Map<String, Object> createAttributes(final AMQConnectionModel conn) @@ -146,33 +149,8 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter public Collection<Session> getSessions() { - List<AMQSessionModel> actualSessions = _connection.getSessionModels(); - synchronized (_sessionAdapters) { - Iterator<AMQSessionModel> iterator = _sessionAdapters.keySet().iterator(); - while(iterator.hasNext()) - { - AMQSessionModel session = iterator.next(); - if(!actualSessions.contains(session)) - { - SessionAdapter adapter = _sessionAdapters.get(session); - iterator.remove(); - - childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback. - } - } - - for(AMQSessionModel session : actualSessions) - { - if(!_sessionAdapters.containsKey(session)) - { - SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); - _sessionAdapters.put(session, adapter); - childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback. - } - } - return new ArrayList<Session>(_sessionAdapters.values()); } } @@ -186,7 +164,6 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { synchronized (_sessionAdapters) { - getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions. return _sessionAdapters.get(session); } } @@ -359,4 +336,31 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { return _connection.getSessionModels().size(); } + + @Override + public void sessionAdded(final AMQSessionModel<?, ?> session) + { + synchronized (_sessionAdapters) + { + if(!_sessionAdapters.containsKey(session)) + { + SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); + _sessionAdapters.put(session, adapter); + childAdded(adapter); + } + } + } + + @Override + public void sessionRemoved(final AMQSessionModel<?, ?> session) + { + synchronized (_sessionAdapters) + { + SessionAdapter adapter = _sessionAdapters.remove(session); + if(adapter != null) + { + childRemoved(adapter); + } + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 95e408ddf5..19d174a157 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -29,10 +29,11 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.*; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.QueueConsumer; final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter> @@ -41,7 +42,6 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl private AMQSessionModel _session; - private Map<Consumer, QueueConsumer> _consumerAdapters = new HashMap<Consumer, QueueConsumer>(); @ManagedAttributeField private int _channelId; @@ -50,6 +50,20 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl { super(Collections.<String,Object>emptyMap(),createAttributes(session), taskExecutor); _session = session; + _session.addConsumerListener(new ConsumerListener() + { + @Override + public void consumerAdded(final Consumer<?> consumer) + { + childAdded(consumer); + } + + @Override + public void consumerRemoved(final Consumer<?> consumer) + { + childRemoved(consumer); + } + }); } private static Map<String, Object> createAttributes(final AMQSessionModel session) @@ -75,15 +89,12 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl public Collection<org.apache.qpid.server.model.Consumer> getConsumers() { - synchronized (_consumerAdapters) - { - return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values()); - } + return (Collection<Consumer>) _session.getConsumers(); } public Collection<Publisher> getPublishers() { - return null; //TODO + return Collections.emptySet(); //TODO } public String setName(final String currentName, final String desiredName) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index d978705841..624e2cd280 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.transport.SessionListener; import java.net.SocketAddress; import java.security.Principal; @@ -98,4 +99,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends String getVirtualHostName(); String getRemoteContainerName(); + + void addSessionListener(SessionModelListener listener); + + void removeSessionListener(SessionModelListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index db92f6950d..e7c14a7a4d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.protocol; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.Deletable; @@ -86,4 +88,10 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo int getChannelId(); int getConsumerCount(); + + Collection<Consumer<?>> getConsumers(); + + void addConsumerListener(ConsumerListener listener); + + void removeConsumerListener(ConsumerListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java new file mode 100644 index 0000000000..7d797d05b8 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.server.model.Consumer; + +public interface ConsumerListener +{ + void consumerAdded(Consumer<?> consumer); + + void consumerRemoved(Consumer<?> consumer); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java new file mode 100644 index 0000000000..80a32682e2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface SessionModelListener +{ + void sessionAdded(AMQSessionModel<?,?> session); + void sessionRemoved(AMQSessionModel<?,?> session); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0db7d78576..b758365039 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -224,4 +225,6 @@ public interface AMQQueue<X extends AMQQueue<X>> long getUnacknowledgedMessages(); + + void bindingCreated(Binding<BindingImpl> binding); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 63dfdeb4b8..8e791cd326 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageSource; @@ -54,7 +55,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -99,6 +99,7 @@ public abstract class AbstractQueue }; private final VirtualHost _virtualHost; + private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); /** null means shared */ private String _description; @@ -682,7 +683,7 @@ public abstract class AbstractQueue final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<Consumer.Option> optionSet) + EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { @@ -762,13 +763,13 @@ public abstract class AbstractQueue throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); } - boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT); - if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) + if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL)) { optionSet = EnumSet.copyOf(optionSet); - optionSet.add(Consumer.Option.NO_LOCAL); + optionSet.add(ConsumerImpl.Option.NO_LOCAL); } if(exclusive && getConsumerCount() != 0) @@ -822,6 +823,9 @@ public abstract class AbstractQueue // TODO } + childAdded(consumer); + consumer.addChangeListener(_deletedChildListener); + deliverAsync(consumer); return consumer; @@ -2598,7 +2602,7 @@ public abstract class AbstractQueue case CONTAINER: case CONNECTION: AMQSessionModel session = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(session == null) { @@ -2624,7 +2628,7 @@ public abstract class AbstractQueue case CONTAINER: case PRINCIPAL: AMQConnectionModel con = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(con == null) { @@ -2652,7 +2656,7 @@ public abstract class AbstractQueue case NONE: case PRINCIPAL: String containerID = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(containerID == null) { @@ -2683,7 +2687,7 @@ public abstract class AbstractQueue case NONE: case CONTAINER: Principal principal = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(principal == null) { @@ -3079,4 +3083,43 @@ public abstract class AbstractQueue } } + @Override + public void bindingCreated(final Binding<BindingImpl> binding) + { + childAdded(binding); + binding.addChangeListener(_deletedChildListener); + } + + private class DeletedChildListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + AbstractQueue.this.childRemoved(object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index 827d44dfb3..8220993e03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 3bc6cbd625..01594e68a9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index c64291faec..592e596a70 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; @@ -160,7 +160,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - Consumer assignedSub = group.getConsumer(); + ConsumerImpl assignedSub = group.getConsumer(); if(assignedSub == sub) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 660d4ec5d6..5ffbc0dbaa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.Consumer; -public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer, org.apache.qpid.server.model.Consumer<X> +public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, Consumer<X> { void flushBatched(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 1e7c8030c1..d984cf8ab4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e3b56d25b7..b62e100eea 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; @@ -29,7 +30,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); if(acquired) @@ -217,7 +217,7 @@ public abstract class QueueEntryImpl implements QueueEntry return (_state instanceof ConsumerAcquiredState); } - public boolean isAcquiredBy(Consumer consumer) + public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; return state instanceof ConsumerAcquiredState @@ -295,7 +295,7 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Consumer consumer) + public boolean isRejectedBy(ConsumerImpl consumer) { if (_rejectedBy != null) // We have consumers that rejected this message diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 070b1990a6..d2687a33fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index ac0521b34e..dca3576827 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.security; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; @@ -354,7 +354,7 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void authoriseCreateConsumer(final Consumer consumer) + public void authoriseCreateConsumer(final ConsumerImpl consumer) { // TODO - remove cast to AMQQueue and allow testing of consumption from any MessageSource final AMQQueue queue = (AMQQueue) consumer.getMessageSource(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index eeb83a7148..0f5931b902 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -29,10 +29,13 @@ import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; @@ -41,9 +44,9 @@ import org.apache.qpid.server.util.StateChangeListener; import java.net.SocketAddress; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -175,12 +178,12 @@ public class MockConsumer implements ConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @@ -336,6 +339,24 @@ public class MockConsumer implements ConsumerTarget } @Override + public Collection<Consumer<?>> getConsumers() + { + return null; + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + + } + + @Override public void close(AMQConstant cause, String message) { } @@ -475,6 +496,18 @@ public class MockConsumer implements ConsumerTarget } @Override + public void addSessionListener(final SessionModelListener listener) + { + + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + + } + + @Override public String getClientVersion() { return null; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 97365479fa..913f400a5f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -33,6 +33,7 @@ import java.util.*; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -46,7 +47,6 @@ import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; import org.apache.qpid.server.consumer.MockConsumer; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -158,8 +158,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding a consumer adds it to the queue _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, @@ -195,8 +195,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", @@ -213,8 +213,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageA, null); _queue.enqueue(messageB, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", @@ -234,8 +234,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -281,8 +281,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -333,8 +333,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageC = createMessage(new Long(26)); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -386,12 +386,12 @@ abstract class AbstractQueueTestBase extends QpidTestCase QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); @@ -428,8 +428,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding an exclusive consumer adds it to the queue _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -454,8 +454,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase { _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); } catch (MessageSource.ExistingExclusiveConsumer e) @@ -468,14 +468,14 @@ abstract class AbstractQueueTestBase extends QpidTestCase // existing consumer _consumer.close(); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); try { _consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE)); } catch (MessageSource.ExistingConsumerPreventsExclusive e) @@ -492,7 +492,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage message = createMessage(id); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); _queue.enqueue(message, new Action<MessageInstance>() { @@ -677,11 +677,11 @@ abstract class AbstractQueueTestBase extends QpidTestCase // register the consumers testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); //check that no messages have been delivered to the //consumers during registration diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java index 4a762afbdb..3f5093a03a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.test.utils.QpidTestCase; import static org.mockito.Mockito.mock; @@ -146,7 +146,7 @@ public class ConsumerListTest extends QpidTestCase * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given * Consumer, or null if none is found. */ - private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub) + private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final ConsumerImpl sub) { QueueConsumerList.ConsumerNode node = list.getHead(); while (node != null && node.getConsumer() != sub) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 7c89f9855c..74a2262265 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -60,7 +60,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -76,7 +76,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } @@ -86,7 +86,7 @@ public class MockMessageInstance implements MessageInstance return false; } - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -116,7 +116,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 9c8449ce5f..09cec61909 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Collections; import junit.framework.AssertionFailedError; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.EnumSet; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.when; @@ -65,7 +65,7 @@ public class PriorityQueueTest extends AbstractQueueTestBase queue.enqueue(createMessage(9L, (byte) 0), null); // Register subscriber - queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class)); + queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class)); Thread.sleep(150); ArrayList<MessageInstance> msgs = getConsumer().getMessages(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 39d53637e4..c89d2abeae 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -24,7 +24,6 @@ import junit.framework.TestCase; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 4116be1954..1627de22f2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.message.MessageInstance; @@ -57,8 +57,8 @@ public class StandardQueueTest extends AbstractQueueTestBase ServerMessage message = createMessage(25l); QueueConsumer consumer = (QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); getQueue().enqueue(message, null); consumer.close(); @@ -84,8 +84,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify adding an inactive consumer doesn't increase the count @@ -97,8 +97,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify behaviour in face of expected state changes: @@ -151,8 +151,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // put test messages into a queue putGivenNumberOfMessages(queue, 4); @@ -219,8 +219,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, entries.get(0).getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -335,7 +335,7 @@ public class StandardQueueTest extends AbstractQueueTestBase } @Override - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { if(_message.getMessageNumber() % 2 == 0) { |
