diff options
Diffstat (limited to 'qpid/java')
53 files changed, 798 insertions, 243 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java index 9fdae72188..0bba5c7967 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/binding/BindingImpl.java @@ -94,6 +94,7 @@ public class BindingImpl //Perform ACLs queue.getVirtualHost().getSecurityManager().authoriseCreateBinding(this); + _queue.bindingCreated(this); _logSubject = new BindingLogSubject(_bindingKey,exchange,queue); getEventLogger().message(_logSubject, BindingMessages.CREATED(String.valueOf(getArguments()), getArguments() != null diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java index 4cfcb01cf0..b15b01ede5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/Consumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerImpl.java @@ -25,7 +25,7 @@ import java.util.concurrent.atomic.AtomicLong; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; -public interface Consumer +public interface ConsumerImpl { AtomicLong CONSUMER_NUMBER_GENERATOR = new AtomicLong(0); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java index 8f6b008f9b..faf5a724f3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/consumer/ConsumerTarget.java @@ -36,9 +36,9 @@ public interface ConsumerTarget State getState(); - void consumerAdded(Consumer sub); + void consumerAdded(ConsumerImpl sub); - void consumerRemoved(Consumer sub); + void consumerRemoved(ConsumerImpl sub); void setStateListener(StateChangeListener<ConsumerTarget, State> listener); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java index 043fa4a664..d0b1670a45 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/filter/FilterSupport.java @@ -30,9 +30,8 @@ import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; import org.apache.qpid.filter.selector.TokenMgrError; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; -import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; public class FilterSupport @@ -132,8 +131,8 @@ public class FilterSupport public boolean matches(Filterable message) { - final Collection<? extends Consumer> consumers = _queue.getConsumers(); - for(Consumer c : consumers) + final Collection<? extends ConsumerImpl> consumers = _queue.getConsumers(); + for(ConsumerImpl c : consumers) { if(c.getSessionModel().getConnectionReference() == message.getConnectionReference()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 1913f11ae1..1a70f80eab 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java index 8c69f47367..4ee47e05e9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageInstance.java @@ -21,10 +21,9 @@ package org.apache.qpid.server.message; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; -import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -50,23 +49,23 @@ public interface MessageInstance boolean acquiredByConsumer(); - boolean isAcquiredBy(Consumer consumer); + boolean isAcquiredBy(ConsumerImpl consumer); void setRedelivered(); boolean isRedelivered(); - Consumer getDeliveredConsumer(); + ConsumerImpl getDeliveredConsumer(); void reject(); - boolean isRejectedBy(Consumer consumer); + boolean isRejectedBy(ConsumerImpl consumer); boolean getDeliveredToConsumer(); boolean expired(); - boolean acquire(Consumer sub); + boolean acquire(ConsumerImpl sub); int getMaximumDeliveryCount(); @@ -160,7 +159,7 @@ public interface MessageInstance } } - public final class ConsumerAcquiredState<C extends Consumer> extends EntryState + public final class ConsumerAcquiredState<C extends ConsumerImpl> extends EntryState { private final C _consumer; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java index 382a9df7e9..7f6629e33d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSource.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.message; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -31,13 +31,13 @@ import java.util.EnumSet; public interface MessageSource extends TransactionLogResource, MessageNode { - Consumer addConsumer(ConsumerTarget target, FilterManager filters, + ConsumerImpl addConsumer(ConsumerTarget target, FilterManager filters, Class<? extends ServerMessage> messageClass, - String consumerName, EnumSet<Consumer.Option> options) + String consumerName, EnumSet<ConsumerImpl.Option> options) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused; - Collection<? extends Consumer> getConsumers(); + Collection<? extends ConsumerImpl> getConsumers(); void addConsumerRegistrationListener(ConsumerRegistrationListener<? super MessageSource> listener); @@ -47,8 +47,8 @@ public interface MessageSource extends TransactionLogResource, MessageNode interface ConsumerRegistrationListener<Q extends MessageSource> { - void consumerAdded(Q source, Consumer consumer); - void consumerRemoved(Q queue, Consumer consumer); + void consumerAdded(Q source, ConsumerImpl consumer); + void consumerRemoved(Q queue, ConsumerImpl consumer); } /** diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java new file mode 100644 index 0000000000..7ca5c6da8f --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageSourceConsumer.java @@ -0,0 +1,28 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.message; + +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.Consumer; + +public interface MessageSourceConsumer<X extends MessageSourceConsumer<X>> extends ConsumerImpl, Consumer<MessageSourceConsumer<X>> +{ +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java index 89d5622121..0da93fa784 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/ConnectionAdapter.java @@ -29,9 +29,11 @@ import org.apache.qpid.server.model.*; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.stats.StatisticsGatherer; -final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter> +final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter> implements Connection<ConnectionAdapter>, + SessionModelListener { private AMQConnectionModel _connection; @@ -61,6 +63,7 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { super(Collections.<String,Object>emptyMap(), createAttributes(conn), taskExecutor); _connection = conn; + conn.addSessionListener(this); } private static Map<String, Object> createAttributes(final AMQConnectionModel conn) @@ -146,33 +149,8 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter public Collection<Session> getSessions() { - List<AMQSessionModel> actualSessions = _connection.getSessionModels(); - synchronized (_sessionAdapters) { - Iterator<AMQSessionModel> iterator = _sessionAdapters.keySet().iterator(); - while(iterator.hasNext()) - { - AMQSessionModel session = iterator.next(); - if(!actualSessions.contains(session)) - { - SessionAdapter adapter = _sessionAdapters.get(session); - iterator.remove(); - - childRemoved(adapter); // Trigger corresponding ConfigurationChangeListener childRemoved() callback. - } - } - - for(AMQSessionModel session : actualSessions) - { - if(!_sessionAdapters.containsKey(session)) - { - SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); - _sessionAdapters.put(session, adapter); - childAdded(adapter); // Trigger corresponding ConfigurationChangeListener childAdded() callback. - } - } - return new ArrayList<Session>(_sessionAdapters.values()); } } @@ -186,7 +164,6 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { synchronized (_sessionAdapters) { - getSessions(); // Call getSessions() first to ensure _sessionAdapters state is up to date with actualSessions. return _sessionAdapters.get(session); } } @@ -359,4 +336,31 @@ final class ConnectionAdapter extends AbstractConfiguredObject<ConnectionAdapter { return _connection.getSessionModels().size(); } + + @Override + public void sessionAdded(final AMQSessionModel<?, ?> session) + { + synchronized (_sessionAdapters) + { + if(!_sessionAdapters.containsKey(session)) + { + SessionAdapter adapter = new SessionAdapter(session, getTaskExecutor()); + _sessionAdapters.put(session, adapter); + childAdded(adapter); + } + } + } + + @Override + public void sessionRemoved(final AMQSessionModel<?, ?> session) + { + synchronized (_sessionAdapters) + { + SessionAdapter adapter = _sessionAdapters.remove(session); + if(adapter != null) + { + childRemoved(adapter); + } + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java index 95e408ddf5..19d174a157 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/adapter/SessionAdapter.java @@ -29,10 +29,11 @@ import java.util.HashMap; import java.util.Map; import java.util.UUID; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.model.*; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.configuration.updater.TaskExecutor; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.queue.QueueConsumer; final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> implements Session<SessionAdapter> @@ -41,7 +42,6 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl private AMQSessionModel _session; - private Map<Consumer, QueueConsumer> _consumerAdapters = new HashMap<Consumer, QueueConsumer>(); @ManagedAttributeField private int _channelId; @@ -50,6 +50,20 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl { super(Collections.<String,Object>emptyMap(),createAttributes(session), taskExecutor); _session = session; + _session.addConsumerListener(new ConsumerListener() + { + @Override + public void consumerAdded(final Consumer<?> consumer) + { + childAdded(consumer); + } + + @Override + public void consumerRemoved(final Consumer<?> consumer) + { + childRemoved(consumer); + } + }); } private static Map<String, Object> createAttributes(final AMQSessionModel session) @@ -75,15 +89,12 @@ final class SessionAdapter extends AbstractConfiguredObject<SessionAdapter> impl public Collection<org.apache.qpid.server.model.Consumer> getConsumers() { - synchronized (_consumerAdapters) - { - return new ArrayList<org.apache.qpid.server.model.Consumer>(_consumerAdapters.values()); - } + return (Collection<Consumer>) _session.getConsumers(); } public Collection<Publisher> getPublishers() { - return null; //TODO + return Collections.emptySet(); //TODO } public String setName(final String currentName, final String desiredName) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java index d978705841..624e2cd280 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQConnectionModel.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.stats.StatisticsGatherer; import org.apache.qpid.server.util.Deletable; +import org.apache.qpid.transport.SessionListener; import java.net.SocketAddress; import java.security.Principal; @@ -98,4 +99,8 @@ public interface AMQConnectionModel<T extends AMQConnectionModel<T,S>, S extends String getVirtualHostName(); String getRemoteContainerName(); + + void addSessionListener(SessionModelListener listener); + + void removeSessionListener(SessionModelListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java index db92f6950d..e7c14a7a4d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/AMQSessionModel.java @@ -20,11 +20,13 @@ */ package org.apache.qpid.server.protocol; +import java.util.Collection; import java.util.UUID; import java.util.concurrent.ConcurrentSkipListSet; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.util.Deletable; @@ -86,4 +88,10 @@ public interface AMQSessionModel<T extends AMQSessionModel<T,C>, C extends AMQCo int getChannelId(); int getConsumerCount(); + + Collection<Consumer<?>> getConsumers(); + + void addConsumerListener(ConsumerListener listener); + + void removeConsumerListener(ConsumerListener listener); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java new file mode 100644 index 0000000000..7d797d05b8 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/ConsumerListener.java @@ -0,0 +1,30 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +import org.apache.qpid.server.model.Consumer; + +public interface ConsumerListener +{ + void consumerAdded(Consumer<?> consumer); + + void consumerRemoved(Consumer<?> consumer); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java new file mode 100644 index 0000000000..80a32682e2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/protocol/SessionModelListener.java @@ -0,0 +1,27 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol; + +public interface SessionModelListener +{ + void sessionAdded(AMQSessionModel<?,?> session); + void sessionRemoved(AMQSessionModel<?,?> session); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java index 0db7d78576..b758365039 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AMQQueue.java @@ -26,6 +26,7 @@ import org.apache.qpid.server.exchange.ExchangeReferrer; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; +import org.apache.qpid.server.model.Binding; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -224,4 +225,6 @@ public interface AMQQueue<X extends AMQQueue<X>> long getUnacknowledgedMessages(); + + void bindingCreated(Binding<BindingImpl> binding); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java index 63dfdeb4b8..8e791cd326 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AbstractQueue.java @@ -36,6 +36,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; import org.apache.qpid.server.configuration.IllegalConfigurationException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.MessageSource; @@ -54,7 +55,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; @@ -99,6 +99,7 @@ public abstract class AbstractQueue }; private final VirtualHost _virtualHost; + private final DeletedChildListener _deletedChildListener = new DeletedChildListener(); /** null means shared */ private String _description; @@ -682,7 +683,7 @@ public abstract class AbstractQueue final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - EnumSet<Consumer.Option> optionSet) + EnumSet<ConsumerImpl.Option> optionSet) throws ExistingExclusiveConsumer, ExistingConsumerPreventsExclusive, ConsumerAccessRefused { @@ -762,13 +763,13 @@ public abstract class AbstractQueue throw new ServerScopedRuntimeException("Unknown exclusivity policy " + _exclusivityPolicy); } - boolean exclusive = optionSet.contains(Consumer.Option.EXCLUSIVE); - boolean isTransient = optionSet.contains(Consumer.Option.TRANSIENT); + boolean exclusive = optionSet.contains(ConsumerImpl.Option.EXCLUSIVE); + boolean isTransient = optionSet.contains(ConsumerImpl.Option.TRANSIENT); - if(_noLocal && !optionSet.contains(Consumer.Option.NO_LOCAL)) + if(_noLocal && !optionSet.contains(ConsumerImpl.Option.NO_LOCAL)) { optionSet = EnumSet.copyOf(optionSet); - optionSet.add(Consumer.Option.NO_LOCAL); + optionSet.add(ConsumerImpl.Option.NO_LOCAL); } if(exclusive && getConsumerCount() != 0) @@ -822,6 +823,9 @@ public abstract class AbstractQueue // TODO } + childAdded(consumer); + consumer.addChangeListener(_deletedChildListener); + deliverAsync(consumer); return consumer; @@ -2598,7 +2602,7 @@ public abstract class AbstractQueue case CONTAINER: case CONNECTION: AMQSessionModel session = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(session == null) { @@ -2624,7 +2628,7 @@ public abstract class AbstractQueue case CONTAINER: case PRINCIPAL: AMQConnectionModel con = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(con == null) { @@ -2652,7 +2656,7 @@ public abstract class AbstractQueue case NONE: case PRINCIPAL: String containerID = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(containerID == null) { @@ -2683,7 +2687,7 @@ public abstract class AbstractQueue case NONE: case CONTAINER: Principal principal = null; - for(Consumer c : getConsumers()) + for(ConsumerImpl c : getConsumers()) { if(principal == null) { @@ -3079,4 +3083,43 @@ public abstract class AbstractQueue } } + @Override + public void bindingCreated(final Binding<BindingImpl> binding) + { + childAdded(binding); + binding.addChangeListener(_deletedChildListener); + } + + private class DeletedChildListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + AbstractQueue.this.childRemoved(object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java index 827d44dfb3..8220993e03 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/AssignedConsumerMessageGroupManager.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java index 3bc6cbd625..01594e68a9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/BaseQueue.java @@ -21,7 +21,6 @@ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java index c64291faec..592e596a70 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/DefinedGroupMessageGroupManager.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.util.StateChangeListener; import org.slf4j.Logger; @@ -160,7 +160,7 @@ public class DefinedGroupMessageGroupManager implements MessageGroupManager } } - Consumer assignedSub = group.getConsumer(); + ConsumerImpl assignedSub = group.getConsumer(); if(assignedSub == sub) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java index 660d4ec5d6..5ffbc0dbaa 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueConsumer.java @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; -import org.apache.qpid.server.consumer.ConsumerTarget; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; +import org.apache.qpid.server.model.Consumer; -public interface QueueConsumer<X extends QueueConsumer<X>> extends Consumer, org.apache.qpid.server.model.Consumer<X> +public interface QueueConsumer<X extends QueueConsumer<X>> extends ConsumerImpl, Consumer<X> { void flushBatched(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 1e7c8030c1..d984cf8ab4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.MessageInstance; public interface QueueEntry extends MessageInstance, Comparable<QueueEntry> diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index e3b56d25b7..b62e100eea 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.queue; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; @@ -29,7 +30,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -201,7 +201,7 @@ public abstract class QueueEntryImpl implements QueueEntry return acquired; } - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); if(acquired) @@ -217,7 +217,7 @@ public abstract class QueueEntryImpl implements QueueEntry return (_state instanceof ConsumerAcquiredState); } - public boolean isAcquiredBy(Consumer consumer) + public boolean isAcquiredBy(ConsumerImpl consumer) { EntryState state = _state; return state instanceof ConsumerAcquiredState @@ -295,7 +295,7 @@ public abstract class QueueEntryImpl implements QueueEntry } } - public boolean isRejectedBy(Consumer consumer) + public boolean isRejectedBy(ConsumerImpl consumer) { if (_rejectedBy != null) // We have consumers that rejected this message diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java index 070b1990a6..d2687a33fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryList.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.message.ServerMessage; public interface QueueEntryList diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java index ac0521b34e..dca3576827 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/security/SecurityManager.java @@ -21,7 +21,7 @@ package org.apache.qpid.server.security; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; @@ -354,7 +354,7 @@ public class SecurityManager implements ConfigurationChangeListener } } - public void authoriseCreateConsumer(final Consumer consumer) + public void authoriseCreateConsumer(final ConsumerImpl consumer) { // TODO - remove cast to AMQQueue and allow testing of consumption from any MessageSource final AMQQueue queue = (AMQQueue) consumer.getMessageSource(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java index eeb83a7148..0f5931b902 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/consumer/MockConsumer.java @@ -29,10 +29,13 @@ import org.apache.qpid.server.filter.SimpleFilterManager; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Consumer; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.stats.StatisticsCounter; import org.apache.qpid.server.util.Action; @@ -41,9 +44,9 @@ import org.apache.qpid.server.util.StateChangeListener; import java.net.SocketAddress; import java.security.Principal; import java.util.ArrayList; +import java.util.Collection; import java.util.List; import java.util.UUID; -import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; @@ -175,12 +178,12 @@ public class MockConsumer implements ConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @@ -336,6 +339,24 @@ public class MockConsumer implements ConsumerTarget } @Override + public Collection<Consumer<?>> getConsumers() + { + return null; + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + + } + + @Override public void close(AMQConstant cause, String message) { } @@ -475,6 +496,18 @@ public class MockConsumer implements ConsumerTarget } @Override + public void addSessionListener(final SessionModelListener listener) + { + + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + + } + + @Override public String getClientVersion() { return null; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java index 97365479fa..913f400a5f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/AbstractQueueTestBase.java @@ -33,6 +33,7 @@ import java.util.*; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.LifetimePolicy; import org.apache.qpid.server.model.Queue; @@ -46,7 +47,6 @@ import org.apache.qpid.server.model.QueueNotificationListener; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AbstractQueue.QueueEntryFilter; import org.apache.qpid.server.consumer.MockConsumer; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.BrokerTestHelper; import org.apache.qpid.server.virtualhost.VirtualHost; @@ -158,8 +158,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding a consumer adds it to the queue _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); assertEquals("Queue does not have active consumer", 1, @@ -195,8 +195,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _queue.enqueue(messageA, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageA, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after an enqueue", @@ -213,8 +213,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _queue.enqueue(messageA, null); _queue.enqueue(messageB, null); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); Thread.sleep(150); assertEquals(messageB, _consumer.getQueueContext().getLastSeenEntry().getMessage()); assertNull("There should be no releasedEntry after enqueues", @@ -234,8 +234,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -281,8 +281,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageA = createMessage(new Long(24)); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -333,8 +333,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage messageC = createMessage(new Long(26)); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); EntryListAddingAction postEnqueueAction = new EntryListAddingAction(queueEntries); @@ -386,12 +386,12 @@ abstract class AbstractQueueTestBase extends QpidTestCase QueueConsumer consumer1 = (QueueConsumer) _queue.addConsumer(target1, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); QueueConsumer consumer2 = (QueueConsumer) _queue.addConsumer(target2, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); final ArrayList<QueueEntry> queueEntries = new ArrayList<QueueEntry>(); @@ -428,8 +428,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase // Check adding an exclusive consumer adds it to the queue _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Queue does not have consumer", 1, _queue.getConsumerCount()); @@ -454,8 +454,8 @@ abstract class AbstractQueueTestBase extends QpidTestCase { _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); } catch (MessageSource.ExistingExclusiveConsumer e) @@ -468,14 +468,14 @@ abstract class AbstractQueueTestBase extends QpidTestCase // existing consumer _consumer.close(); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); try { _consumer = (QueueConsumer<?>) _queue.addConsumer(subB, null, messageA.getClass(), "test", - EnumSet.of(Consumer.Option.EXCLUSIVE)); + EnumSet.of(ConsumerImpl.Option.EXCLUSIVE)); } catch (MessageSource.ExistingConsumerPreventsExclusive e) @@ -492,7 +492,7 @@ abstract class AbstractQueueTestBase extends QpidTestCase ServerMessage message = createMessage(id); _consumer = (QueueConsumer<?>) _queue.addConsumer(_consumerTarget, null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); _queue.enqueue(message, new Action<MessageInstance>() { @@ -677,11 +677,11 @@ abstract class AbstractQueueTestBase extends QpidTestCase // register the consumers testQueue.addConsumer(sub1, sub1.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub2, sub2.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); testQueue.addConsumer(sub3, sub3.getFilters(), msg1.getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, ConsumerImpl.Option.SEES_REQUEUES)); //check that no messages have been delivered to the //consumers during registration diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java index 4a762afbdb..3f5093a03a 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/ConsumerListTest.java @@ -21,8 +21,8 @@ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.test.utils.QpidTestCase; import static org.mockito.Mockito.mock; @@ -146,7 +146,7 @@ public class ConsumerListTest extends QpidTestCase * Traverses the list nodes in a non-mutating fashion, returning the first node which matches the given * Consumer, or null if none is found. */ - private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final Consumer sub) + private QueueConsumerList.ConsumerNode getNodeForConsumer(final QueueConsumerList list, final ConsumerImpl sub) { QueueConsumerList.ConsumerNode node = list.getHead(); while (node != null && node.getConsumer() != sub) diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java index 7c89f9855c..74a2262265 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/MockMessageInstance.java @@ -20,13 +20,13 @@ */ package org.apache.qpid.server.queue; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.StateChangeListener; @@ -60,7 +60,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -76,7 +76,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } @@ -86,7 +86,7 @@ public class MockMessageInstance implements MessageInstance return false; } - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -116,7 +116,7 @@ public class MockMessageInstance implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java index 9c8449ce5f..09cec61909 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/PriorityQueueTest.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.queue; import java.util.Collections; import junit.framework.AssertionFailedError; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; @@ -31,7 +32,6 @@ import java.util.ArrayList; import java.util.EnumSet; import org.apache.qpid.server.model.Queue; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.when; @@ -65,7 +65,7 @@ public class PriorityQueueTest extends AbstractQueueTestBase queue.enqueue(createMessage(9L, (byte) 0), null); // Register subscriber - queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(Consumer.Option.class)); + queue.addConsumer(getConsumer(), null, null, "test", EnumSet.noneOf(ConsumerImpl.Option.class)); Thread.sleep(150); ArrayList<MessageInstance> msgs = getConsumer().getMessages(); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java index 39d53637e4..c89d2abeae 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/QueueEntryListTestBase.java @@ -24,7 +24,6 @@ import junit.framework.TestCase; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java index 4116be1954..1627de22f2 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/queue/StandardQueueTest.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.queue; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.consumer.MockConsumer; import org.apache.qpid.server.message.MessageInstance; @@ -57,8 +57,8 @@ public class StandardQueueTest extends AbstractQueueTestBase ServerMessage message = createMessage(25l); QueueConsumer consumer = (QueueConsumer) getQueue().addConsumer(getConsumerTarget(), null, message.getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); getQueue().enqueue(message, null); consumer.close(); @@ -84,8 +84,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify adding an inactive consumer doesn't increase the count @@ -97,8 +97,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); assertEquals("Unexpected active consumer count", 1, queue.getConsumerCountWithCredit()); //verify behaviour in face of expected state changes: @@ -151,8 +151,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, createMessage(-1l).getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // put test messages into a queue putGivenNumberOfMessages(queue, 4); @@ -219,8 +219,8 @@ public class StandardQueueTest extends AbstractQueueTestBase null, entries.get(0).getMessage().getClass(), "test", - EnumSet.of(Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES)); + EnumSet.of(ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES)); // process queue testQueue.processQueue(new QueueRunner(testQueue) @@ -335,7 +335,7 @@ public class StandardQueueTest extends AbstractQueueTestBase } @Override - public boolean acquire(Consumer sub) + public boolean acquire(ConsumerImpl sub) { if(_message.getMessageNumber() % 2 == 0) { diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index eeafb30642..a3fabf076c 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,6 +20,7 @@ */ package org.apache.qpid.server.protocol.v0_10; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; @@ -31,7 +32,6 @@ import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; @@ -66,7 +66,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC private final Map<String, Object> _arguments; private int _deferredMessageCredit; private long _deferredSizeCredit; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_0_10(ServerSession session, @@ -90,7 +90,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC _name = name; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -105,7 +105,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { consumer.getSendLock(); @@ -569,13 +569,13 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java index a3a80415ac..5e899aa635 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerConnection.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.AuthorizationHolder; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -80,6 +81,9 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S private final CopyOnWriteArrayList<Action<? super ServerConnection>> _taskList = new CopyOnWriteArrayList<Action<? super ServerConnection>>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); + private volatile boolean _stopped; public ServerConnection(final long connectionId, Broker broker) @@ -383,6 +387,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S public synchronized void registerSession(final Session ssn) { super.registerSession(ssn); + sessionAdded((ServerSession)ssn); if(_blocking) { ((ServerSession)ssn).block(); @@ -392,6 +397,7 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S @Override public synchronized void removeSession(final Session ssn) { + sessionRemoved((ServerSession)ssn); super.removeSession(ssn); } @@ -552,6 +558,35 @@ public class ServerConnection extends Connection implements AMQConnectionModel<S } @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + + @Override public String getClientVersion() { return getConnectionDelegate().getClientVersion(); diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java index c2eacfe6e8..0bb3008d13 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSession.java @@ -45,6 +45,12 @@ import java.util.concurrent.atomic.AtomicReference; import javax.security.auth.Subject; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.server.TransactionTimeoutHelper; @@ -108,6 +114,9 @@ public class ServerSession extends Session private ChannelLogSubject _logSubject; private final AtomicInteger _outstandingCredit = new AtomicInteger(UNLIMITED_CREDIT); private final CheckCapacityAction _checkCapacityAction = new CheckCapacityAction(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + public static interface MessageDispositionChangeListener { @@ -133,6 +142,7 @@ public class ServerSession extends Session private final AtomicLong _txnCount = new AtomicLong(0); private Map<String, ConsumerTarget_0_10> _subscriptions = new ConcurrentHashMap<String, ConsumerTarget_0_10>(); + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); private final List<Action<? super ServerSession>> _taskList = new CopyOnWriteArrayList<Action<? super ServerSession>>(); @@ -458,6 +468,18 @@ public class ServerSession extends Session _subscriptions.put(destination == null ? NULL_DESTINATION : destination, sub); } + + public void register(final ConsumerImpl consumerImpl) + { + if(consumerImpl instanceof Consumer<?>) + { + final Consumer<?> consumer = (Consumer<?>) consumerImpl; + _consumers.add(consumer); + consumer.addChangeListener(_consumerClosedListener); + consumerAdded(consumer); + } + } + public ConsumerTarget_0_10 getSubscription(String destination) { return _subscriptions.get(destination == null ? NULL_DESTINATION : destination); @@ -949,6 +971,41 @@ public class ServerSession extends Session } @Override + public Collection<Consumer<?>> getConsumers() + { + + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override public int compareTo(ServerSession o) { return getId().compareTo(o.getId()); @@ -966,4 +1023,37 @@ public class ServerSession extends Session } } } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java index 2593c66191..040be92ceb 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ServerSessionDelegate.java @@ -27,6 +27,7 @@ import java.util.LinkedHashMap; import java.util.UUID; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.DirectExchange; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; @@ -51,7 +52,6 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AlreadyKnownDtxException; import org.apache.qpid.server.txn.DtxNotSelectedException; import org.apache.qpid.server.txn.IncorrectDtxStateException; @@ -234,25 +234,25 @@ public class ServerSessionDelegate extends SessionDelegate ((ServerSession)session).register(destination, target); try { - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); if(method.getAcquireMode() == MessageAcquireMode.PRE_ACQUIRED) { - options.add(Consumer.Option.ACQUIRES); + options.add(ConsumerImpl.Option.ACQUIRES); } if(method.getAcquireMode() != MessageAcquireMode.NOT_ACQUIRED || method.getAcceptMode() == MessageAcceptMode.EXPLICIT) { - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(method.getExclusive()) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } - Consumer sub = + ((ServerSession)session).register( queue.addConsumer(target, filterManager, MessageTransferMessage.class, destination, - options); + options)); } catch (AMQQueue.ExistingExclusiveConsumer existing) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 780e7ad199..baf5eceef7 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -33,6 +33,7 @@ import org.apache.log4j.Logger; import org.apache.qpid.AMQConnectionException; import org.apache.qpid.AMQException; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.Filterable; @@ -66,7 +67,12 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.ConfigurationChangeListener; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.CapacityChecker; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.queue.AMQQueue; @@ -76,7 +82,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.StoreFuture; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AsyncAutoCommitTransaction; import org.apache.qpid.server.txn.LocalTransaction; import org.apache.qpid.server.txn.LocalTransaction.ActivityTimeAccessor; @@ -173,6 +178,9 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final CapacityCheckAction _capacityCheckAction = new CapacityCheckAction(); private final ImmediateAction _immediateAction = new ImmediateAction(); private Subject _subject; + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); public AMQChannel(T session, int channelId, final MessageStore messageStore) @@ -526,7 +534,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } - public Consumer getSubscription(AMQShortString tag) + public ConsumerImpl getSubscription(AMQShortString tag) { final ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.get(tag); return target == null ? null : target.getConsumer(); @@ -545,7 +553,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * @param exclusive Flag requesting exclusive access to the queue * @return the consumer tag. This is returned to the subscriber and used in subsequent unsubscribe requests * - * @throws AMQException if something goes wrong + * @throws org.apache.qpid.AMQException if something goes wrong */ public AMQShortString consumeFromSource(AMQShortString tag, MessageSource source, boolean acks, FieldTable filters, boolean exclusive, boolean noLocal) @@ -564,7 +572,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } ConsumerTarget_0_8 target; - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); if(filters != null && Boolean.TRUE.equals(filters.get(AMQPFilterTypes.NO_CONSUME.getValue()))) { @@ -573,19 +581,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> else if(acks) { target = ConsumerTarget_0_8.createAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else { target = ConsumerTarget_0_8.createNoAckTarget(this, tag, filters, _creditManager); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } if(exclusive) { - options.add(Consumer.Option.EXCLUSIVE); + options.add(ConsumerImpl.Option.EXCLUSIVE); } @@ -615,12 +623,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> } }); } - Consumer sub = + ConsumerImpl sub = source.addConsumer(target, filterManager, - AMQMessage.class, - AMQShortString.toString(tag), - options); + AMQMessage.class, + AMQShortString.toString(tag), + options); + if(sub instanceof Consumer<?>) + { + final Consumer<?> modelConsumer = (Consumer<?>) sub; + consumerAdded(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + _consumers.add(modelConsumer); + } } catch (AccessControlException e) { @@ -659,15 +674,19 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { ConsumerTarget_0_8 target = _tag2SubscriptionTargetMap.remove(consumerTag); - Consumer sub = target == null ? null : target.getConsumer(); + ConsumerImpl sub = target == null ? null : target.getConsumer(); if (sub != null) { sub.close(); + if(sub instanceof Consumer<?>) + { + _consumers.remove(sub); + } return true; } else { - _logger.warn("Attempt to unsubscribe consumer with tag '"+consumerTag+"' which is not registered."); + _logger.warn("Attempt to unsubscribe consumer with tag '" + consumerTag + "' which is not registered."); } return false; } @@ -735,7 +754,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> _logger.info("Unsubscribing consumer '" + me.getKey() + "' on channel " + toString()); } - Consumer sub = me.getValue().getConsumer(); + ConsumerImpl sub = me.getValue().getConsumer(); if(sub != null) { @@ -754,7 +773,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> * delivery tag) * @param consumer The consumer that is to acknowledge this message. */ - public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, Consumer consumer) + public void addUnacknowledgedMessage(MessageInstance entry, long deliveryTag, ConsumerImpl consumer) { if (_logger.isDebugEnabled()) { @@ -1126,7 +1145,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> for(MessageInstance entry : _resendList) { - Consumer sub = entry.getDeliveredConsumer(); + ConsumerImpl sub = entry.getDeliveredConsumer(); if(sub == null || sub.isClosed()) { entry.release(); @@ -1199,7 +1218,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> private final RecordDeliveryMethod _recordDeliveryMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { addUnacknowledgedMessage(entry, deliveryTag, sub); } @@ -1658,4 +1677,71 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { return _tag2SubscriptionTargetMap.size(); } + + @Override + public Collection<Consumer<?>> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final State oldState, final State newState) + { + if(newState == State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } + + private void consumerAdded(final Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(final Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + @Override + public void addConsumerListener(ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(ConsumerListener listener) + { + _consumerListeners.remove(listener); + } } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java index 2ebcde199b..a86530fe0e 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQProtocolEngine.java @@ -56,10 +56,13 @@ import org.apache.qpid.protocol.AMQConstant; import org.apache.qpid.protocol.AMQMethodEvent; import org.apache.qpid.protocol.ServerProtocolEngine; import org.apache.qpid.server.connection.ConnectionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.configuration.BrokerProperties; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.protocol.v0_8.handler.ServerMethodDispatcherImpl; import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.logging.messages.ConnectionMessages; @@ -73,7 +76,6 @@ import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.protocol.v0_8.state.AMQState; import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.stats.StatisticsCounter; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; import org.apache.qpid.server.util.ServerScopedRuntimeException; @@ -102,6 +104,8 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi private final Map<Integer, AMQChannel<AMQProtocolEngine>> _channelMap = new HashMap<Integer, AMQChannel<AMQProtocolEngine>>(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); @SuppressWarnings("unchecked") private final AMQChannel<AMQProtocolEngine>[] _cachedChannels = new AMQChannel[CHANNEL_CACHE_SIZE + 1]; @@ -759,7 +763,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi synchronized (_channelMap) { _channelMap.put(channel.getChannelId(), channel); - + sessionAdded(channel); if(_blocking) { channel.block(); @@ -773,6 +777,22 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } } + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + public Long getMaximumNumberOfChannels() { return _maxNoOfChannels; @@ -844,15 +864,16 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi */ public void removeChannel(int channelId) { + AMQChannel<AMQProtocolEngine> session; synchronized (_channelMap) { - _channelMap.remove(channelId); - + session = _channelMap.remove(channelId); if ((channelId & CHANNEL_CACHE_SIZE) == channelId) { _cachedChannels[channelId] = null; } } + sessionRemoved(session); } /** @@ -1509,6 +1530,18 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi return String.valueOf(getContextKey()); } + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + public void setDeferFlush(boolean deferFlush) { _deferFlush = deferFlush; @@ -1525,7 +1558,7 @@ public class AMQProtocolEngine implements ServerProtocolEngine, AMQProtocolSessi } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { registerMessageDelivered(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java index 9f8799f68e..fa26a73f93 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ClientDeliveryMethod.java @@ -20,13 +20,12 @@ */ package org.apache.qpid.server.protocol.v0_8; -import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.ServerMessage; -import org.apache.qpid.server.consumer.Consumer; public interface ClientDeliveryMethod { - void deliverToClient(final Consumer sub, final ServerMessage message, final InstanceProperties props, + void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java index 2ce8caefc9..3de89a1d70 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ConsumerTarget_0_8.java @@ -25,17 +25,16 @@ import org.apache.qpid.AMQException; import org.apache.qpid.common.AMQPFilterTypes; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; -import org.apache.qpid.server.protocol.v0_8.handler.BasicGetMethodHandler; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.StateChangeListener; @@ -71,7 +70,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen private final AtomicLong _unacknowledgedCount = new AtomicLong(0); private final AtomicLong _unacknowledgedBytes = new AtomicLong(0); - private Consumer _consumer; + private ConsumerImpl _consumer; public static ConsumerTarget_0_8 createBrowserTarget(AMQChannel channel, @@ -368,18 +367,18 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen } } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @@ -428,7 +427,7 @@ public abstract class ConsumerTarget_0_8 extends AbstractConsumerTarget implemen boolean closed = false; State state = getState(); - final Consumer consumer = getConsumer(); + final ConsumerImpl consumer = getConsumer(); if(consumer != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java index 1de1638c2e..7a2fdb05fc 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeue.java @@ -23,8 +23,8 @@ package org.apache.qpid.server.protocol.v0_8; import org.apache.log4j.Logger; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; import java.util.Map; @@ -49,7 +49,7 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor { message.setRedelivered(); - final Consumer consumer = message.getDeliveredConsumer(); + final ConsumerImpl consumer = message.getDeliveredConsumer(); if (consumer != null) { // Consumer exists diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java index 70d7da3432..c13ff17f67 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/RecordDeliveryMethod.java @@ -20,10 +20,10 @@ */ package org.apache.qpid.server.protocol.v0_8; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; -import org.apache.qpid.server.consumer.Consumer; public interface RecordDeliveryMethod { - void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag); + void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag); } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java index f620abf30f..76b5cbbbb9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/handler/BasicGetMethodHandler.java @@ -29,6 +29,7 @@ import org.apache.qpid.framing.BasicGetBody; import org.apache.qpid.framing.BasicGetEmptyBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; @@ -44,7 +45,6 @@ import org.apache.qpid.server.protocol.v0_8.state.AMQStateManager; import org.apache.qpid.server.protocol.v0_8.state.StateAwareMethodListener; import org.apache.qpid.server.protocol.v0_8.ClientDeliveryMethod; import org.apache.qpid.server.protocol.v0_8.RecordDeliveryMethod; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import java.security.AccessControlException; @@ -150,15 +150,15 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB final RecordDeliveryMethod getRecordMethod = new RecordDeliveryMethod() { - public void recordMessageDelivery(final Consumer sub, final MessageInstance entry, final long deliveryTag) + public void recordMessageDelivery(final ConsumerImpl sub, final MessageInstance entry, final long deliveryTag) { channel.addUnacknowledgedMessage(entry, deliveryTag, null); } }; ConsumerTarget_0_8 target; - EnumSet<Consumer.Option> options = EnumSet.of(Consumer.Option.TRANSIENT, Consumer.Option.ACQUIRES, - Consumer.Option.SEES_REQUEUES); + EnumSet<ConsumerImpl.Option> options = EnumSet.of(ConsumerImpl.Option.TRANSIENT, ConsumerImpl.Option.ACQUIRES, + ConsumerImpl.Option.SEES_REQUEUES); if(acks) { @@ -173,7 +173,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB singleMessageCredit, getDeliveryMethod, getRecordMethod); } - Consumer sub = queue.addConsumer(target, null, AMQMessage.class, "", options); + ConsumerImpl sub = queue.addConsumer(target, null, AMQMessage.class, "", options); sub.flush(); sub.close(); return(getDeliveryMethod.hasDeliveredMessage()); @@ -202,7 +202,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener<BasicGetB } @Override - public void deliverToClient(final Consumer sub, final ServerMessage message, + public void deliverToClient(final ConsumerImpl sub, final ServerMessage message, final InstanceProperties props, final long deliveryTag) { _singleMessageCredit.useCreditForMessage(message.getSize()); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java index 8d7de4cd93..e5cfced4e2 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/AckTest.java @@ -26,13 +26,13 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.flow.LimitlessCreditManager; import org.apache.qpid.server.flow.Pre0_10CreditManager; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.store.TestableMemoryMessageStore; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.BrokerTestHelper; @@ -49,7 +49,7 @@ import java.util.Set; public class AckTest extends QpidTestCase { private ConsumerTarget_0_8 _subscriptionTarget; - private Consumer _consumer; + private ConsumerImpl _consumer; private AMQProtocolSession _protocolSession; @@ -176,8 +176,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -209,8 +209,8 @@ public class AckTest extends QpidTestCase null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); UnacknowledgedMessageMap map = _channel.getUnacknowledgedMessageMap(); @@ -232,7 +232,7 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount, true); @@ -255,8 +255,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -292,8 +292,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -326,8 +326,8 @@ public class AckTest extends QpidTestCase null, new LimitlessCreditManager()); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 10; publishMessages(msgCount); @@ -360,8 +360,8 @@ public class AckTest extends QpidTestCase _subscriptionTarget = ConsumerTarget_0_8.createAckTarget(_channel, DEFAULT_CONSUMER_TAG, null, creditManager); _consumer = _queue.addConsumer(_subscriptionTarget, null, AMQMessage.class, DEFAULT_CONSUMER_TAG.toString(), - EnumSet.of(Consumer.Option.SEES_REQUEUES, - Consumer.Option.ACQUIRES)); + EnumSet.of(ConsumerImpl.Option.SEES_REQUEUES, + ConsumerImpl.Option.ACQUIRES)); final int msgCount = 1; publishMessages(msgCount); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java index f18da87d09..6d3e648369 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/ExtractResendAndRequeueTest.java @@ -23,11 +23,11 @@ package org.apache.qpid.server.protocol.v0_8; import junit.framework.TestCase; import org.apache.qpid.AMQException; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.server.consumer.Consumer; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -63,7 +63,7 @@ public class ExtractResendAndRequeueTest extends TestCase private static final int INITIAL_MSG_COUNT = 10; private AMQQueue _queue; private LinkedList<MessageInstance> _referenceList = new LinkedList<MessageInstance>(); - private Consumer _consumer; + private ConsumerImpl _consumer; private boolean _queueDeleted; @Override @@ -74,8 +74,8 @@ public class ExtractResendAndRequeueTest extends TestCase _queue = mock(AMQQueue.class); when(_queue.getName()).thenReturn(getName()); when(_queue.isDeleted()).thenReturn(_queueDeleted); - _consumer = mock(Consumer.class); - when(_consumer.getConsumerNumber()).thenReturn(Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); + _consumer = mock(ConsumerImpl.class); + when(_consumer.getConsumerNumber()).thenReturn(ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement()); long id = 0; diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java index eaa5b6a7a5..18949bba50 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/test/java/org/apache/qpid/server/protocol/v0_8/InternalTestProtocolSession.java @@ -41,6 +41,7 @@ import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQConstant; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageContentSource; import org.apache.qpid.server.message.ServerMessage; @@ -48,7 +49,6 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.virtualhost.VirtualHost; import org.apache.qpid.transport.Sender; import org.apache.qpid.transport.network.NetworkConnection; @@ -244,7 +244,7 @@ public class InternalTestProtocolSession extends AMQProtocolEngine implements Pr @Override - public void deliverToClient(Consumer sub, ServerMessage message, + public void deliverToClient(ConsumerImpl sub, ServerMessage message, InstanceProperties props, long deliveryTag) { _deliveryCount.incrementAndGet(); diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java index 0a53a6436a..00c78581e1 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Connection_1_0.java @@ -40,6 +40,8 @@ import org.apache.qpid.server.model.Broker; import org.apache.qpid.server.model.Port; import org.apache.qpid.server.model.Transport; import org.apache.qpid.server.protocol.AMQConnectionModel; +import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.SessionModelListener; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.stats.StatisticsCounter; @@ -51,6 +53,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.List; import java.util.Set; +import java.util.concurrent.CopyOnWriteArrayList; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.CONNECTION_FORMAT; @@ -68,6 +71,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod private final Object _reference = new Object(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList<SessionModelListener> _sessionListeners = + new CopyOnWriteArrayList<SessionModelListener>(); private StatisticsCounter _messageDeliveryStatistics = new StatisticsCounter(); private StatisticsCounter _messageReceiptStatistics = new StatisticsCounter(); @@ -111,7 +116,6 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod _connectionId = connectionId; _subject.getPrincipals().add(new ConnectionPrincipal(this)); _subjectCreator = subjectCreator; - //_vhost.getConnectionRegistry().registerConnection(this); } @@ -129,6 +133,8 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod host = (String)_broker.getAttribute(Broker.DEFAULT_VIRTUAL_HOST); } _vhost = _broker.getVirtualHostRegistry().getVirtualHost(host); + _vhost.getConnectionRegistry().registerConnection(this); + if(_vhost == null) { final Error err = new Error(); @@ -147,6 +153,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { final Session_1_0 session = new Session_1_0(this, endpoint); _sessions.add(session); + sessionAdded(session); endpoint.setSessionEventListener(new SessionEventListener() { @Override @@ -182,6 +189,7 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod void sessionEnded(Session_1_0 session) { _sessions.remove(session); + sessionRemoved(session); } public void removeDeleteTask(final Action<? super Connection_1_0> task) @@ -428,4 +436,35 @@ public class Connection_1_0 implements ConnectionEventListener, AMQConnectionMod { return _vhost; } + + + @Override + public void addSessionListener(final SessionModelListener listener) + { + _sessionListeners.add(listener); + } + + @Override + public void removeSessionListener(final SessionModelListener listener) + { + _sessionListeners.remove(listener); + } + + private void sessionAdded(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionAdded(session); + } + } + + private void sessionRemoved(final AMQSessionModel<?,?> session) + { + for(SessionModelListener l : _sessionListeners) + { + l.sessionRemoved(session); + } + } + + } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java index f3417710a5..adb2f8ea6a 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java @@ -37,13 +37,13 @@ import org.apache.qpid.amqp_1_0.type.messaging.Released; import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.consumer.AbstractConsumerTarget; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -60,7 +60,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget private Binary _transactionId; private final AMQPDescribedTypeRegistry _typeRegistry; private final SectionEncoder _sectionEncoder; - private Consumer _consumer; + private ConsumerImpl _consumer; public ConsumerTarget_1_0(final SendingLink_1_0 link, boolean acquires) @@ -72,7 +72,7 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget _acquires = acquires; } - public Consumer getConsumer() + public ConsumerImpl getConsumer() { return _consumer; } @@ -498,13 +498,13 @@ class ConsumerTarget_1_0 extends AbstractConsumerTarget } @Override - public void consumerAdded(final Consumer sub) + public void consumerAdded(final ConsumerImpl sub) { _consumer = sub; } @Override - public void consumerRemoved(final Consumer sub) + public void consumerRemoved(final ConsumerImpl sub) { } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java index 24395a6fad..eb1f75b771 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java @@ -26,6 +26,7 @@ import java.util.concurrent.ConcurrentHashMap; import org.apache.log4j.Logger; import org.apache.qpid.server.binding.BindingImpl; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ExclusivityPolicy; import org.apache.qpid.server.model.LifetimePolicy; @@ -55,7 +56,6 @@ import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.model.UUIDGenerator; import org.apache.qpid.server.queue.AMQQueue; -import org.apache.qpid.server.consumer.Consumer; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; @@ -69,7 +69,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS private VirtualHost _vhost; private SendingDestination _destination; - private Consumer _consumer; + private ConsumerImpl _consumer; private ConsumerTarget_1_0 _target; private boolean _draining; @@ -99,7 +99,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS linkAttachment.setDeliveryStateHandler(this); QueueDestination qd = null; - EnumSet<Consumer.Option> options = EnumSet.noneOf(Consumer.Option.class); + EnumSet<ConsumerImpl.Option> options = EnumSet.noneOf(ConsumerImpl.Option.class); boolean noLocal = false; @@ -163,8 +163,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, source.getDistributionMode() != StdDistMode.COPY); if(source.getDistributionMode() != StdDistMode.COPY) { - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } } @@ -318,8 +318,8 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS _target = new ConsumerTarget_1_0(this, true); - options.add(Consumer.Option.ACQUIRES); - options.add(Consumer.Option.SEES_REQUEUES); + options.add(ConsumerImpl.Option.ACQUIRES); + options.add(ConsumerImpl.Option.SEES_REQUEUES); } else @@ -331,7 +331,7 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { if(noLocal) { - options.add(Consumer.Option.NO_LOCAL); + options.add(ConsumerImpl.Option.NO_LOCAL); } try @@ -372,7 +372,6 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS public void resume(SendingLinkAttachment linkAttachment) { _linkAttachment = linkAttachment; - } public void remoteDetached(final LinkEndpoint endpoint, final Detach detach) @@ -692,4 +691,9 @@ public class SendingLink_1_0 implements SendingLinkListener, Link_1_0, DeliveryS { return _vhost; } + + public ConsumerImpl getConsumer() + { + return _consumer; + } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java index 411117be4d..e124b4d5ac 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Session_1_0.java @@ -43,6 +43,7 @@ import org.apache.qpid.amqp_1_0.type.transport.*; import org.apache.qpid.amqp_1_0.type.transport.Error; import org.apache.qpid.server.connection.SessionPrincipal; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.*; import org.apache.qpid.protocol.AMQConstant; @@ -50,6 +51,7 @@ import org.apache.qpid.server.logging.LogSubject; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.protocol.AMQSessionModel; +import org.apache.qpid.server.protocol.ConsumerListener; import org.apache.qpid.server.protocol.LinkRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.txn.AutoCommitTransaction; @@ -85,6 +87,9 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio private AtomicBoolean _closed = new AtomicBoolean(); private final Subject _subject = new Subject(); + private final CopyOnWriteArrayList<Consumer<?>> _consumers = new CopyOnWriteArrayList<Consumer<?>>(); + private final ConfigurationChangeListener _consumerClosedListener = new ConsumerClosedListener(); + private final CopyOnWriteArrayList<ConsumerListener> _consumerListeners = new CopyOnWriteArrayList<ConsumerListener>(); public Session_1_0(final Connection_1_0 connection, final SessionEndpoint endpoint) @@ -184,6 +189,7 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio ); sendingLinkEndpoint.setLinkEventListener(new SubjectSpecificSendingLinkListener(sendingLink)); + registerConsumer(sendingLink.getConsumer()); link = sendingLink; if(TerminusDurability.UNSETTLED_STATE.equals(source.getDurable())) @@ -383,6 +389,17 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio } } + private void registerConsumer(final ConsumerImpl consumer) + { + if(consumer instanceof Consumer<?>) + { + Consumer<?> modelConsumer = (Consumer<?>) consumer; + _consumers.add(modelConsumer); + modelConsumer.addChangeListener(_consumerClosedListener); + consumerAdded(modelConsumer); + } + } + private AMQQueue createTemporaryQueue(Map properties) { @@ -653,11 +670,11 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio @Override public int getConsumerCount() { - // TODO - return 0; + return getConsumers().size(); } + public String toLogString() { long connectionId = getConnectionModel().getConnectionId(); @@ -785,4 +802,72 @@ public class Session_1_0 implements SessionEventListener, AMQSessionModel<Sessio }); } } + + + @Override + public Collection<Consumer<?>> getConsumers() + { + return Collections.unmodifiableCollection(_consumers); + } + + @Override + public void addConsumerListener(final ConsumerListener listener) + { + _consumerListeners.add(listener); + } + + @Override + public void removeConsumerListener(final ConsumerListener listener) + { + _consumerListeners.remove(listener); + } + + private void consumerAdded(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerAdded(consumer); + } + } + + private void consumerRemoved(Consumer<?> consumer) + { + for(ConsumerListener l : _consumerListeners) + { + l.consumerRemoved(consumer); + } + } + + private class ConsumerClosedListener implements ConfigurationChangeListener + { + @Override + public void stateChanged(final ConfiguredObject object, final org.apache.qpid.server.model.State oldState, final org.apache.qpid.server.model.State newState) + { + if(newState == org.apache.qpid.server.model.State.DELETED) + { + consumerRemoved((Consumer<?>)object); + } + } + + @Override + public void childAdded(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void childRemoved(final ConfiguredObject object, final ConfiguredObject child) + { + + } + + @Override + public void attributeSet(final ConfiguredObject object, + final String attributeName, + final Object oldAttributeValue, + final Object newAttributeValue) + { + + } + } } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java index a47506f804..788ce63c8f 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNode.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.Filterable; @@ -949,7 +949,7 @@ class ManagementNode implements MessageSource, MessageDestination final FilterManager filters, final Class<? extends ServerMessage> messageClass, final String consumerName, - final EnumSet<Consumer.Option> options) + final EnumSet<ConsumerImpl.Option> options) { final ManagementNodeConsumer managementNodeConsumer = new ManagementNodeConsumer(consumerName,this, target); @@ -1054,7 +1054,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return false; } @@ -1072,7 +1072,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public Consumer getDeliveredConsumer() + public ConsumerImpl getDeliveredConsumer() { return null; } @@ -1084,7 +1084,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -1102,7 +1102,7 @@ class ManagementNode implements MessageSource, MessageDestination } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java index 8a1f39fdfe..a3b1f932ac 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementNodeConsumer.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.consumer.ConsumerTarget; import org.apache.qpid.server.message.MessageSource; import org.apache.qpid.server.message.internal.InternalMessage; @@ -33,9 +33,9 @@ import java.util.List; import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; -class ManagementNodeConsumer implements Consumer +class ManagementNodeConsumer implements ConsumerImpl { - private final long _id = Consumer.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); + private final long _id = ConsumerImpl.CONSUMER_NUMBER_GENERATOR.getAndIncrement(); private final ManagementNode _managementNode; private final List<ManagementResponse> _queue = Collections.synchronizedList(new ArrayList<ManagementResponse>()); private final ConsumerTarget _target; diff --git a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java index 18c68bd198..ae2828d392 100644 --- a/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java +++ b/qpid/java/broker-plugins/management-amqp/src/main/java/org/apache/qpid/server/management/amqp/ManagementResponse.java @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.management.amqp; -import org.apache.qpid.server.consumer.Consumer; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.filter.Filterable; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageInstance; @@ -84,7 +84,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isAcquiredBy(final Consumer consumer) + public boolean isAcquiredBy(final ConsumerImpl consumer) { return consumer == _consumer && !isDeleted(); } @@ -114,7 +114,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean isRejectedBy(final Consumer consumer) + public boolean isRejectedBy(final ConsumerImpl consumer) { return false; } @@ -132,7 +132,7 @@ class ManagementResponse implements MessageInstance } @Override - public boolean acquire(final Consumer sub) + public boolean acquire(final ConsumerImpl sub) { return false; } diff --git a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java index baf92e8522..0947ae2a89 100644 --- a/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java +++ b/qpid/java/broker-plugins/management-http/src/main/java/org/apache/qpid/server/management/plugin/servlet/rest/MessageServlet.java @@ -31,6 +31,7 @@ import javax.servlet.http.HttpServletRequest; import javax.servlet.http.HttpServletResponse; import org.apache.log4j.Logger; +import org.apache.qpid.server.consumer.ConsumerImpl; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; @@ -40,7 +41,6 @@ import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.QueueEntryVisitor; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; -import org.apache.qpid.server.consumer.Consumer; import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializationConfig; @@ -327,7 +327,7 @@ public class MessageServlet extends AbstractServlet : entry.isAcquired() ? "Acquired" : ""); - final Consumer deliveredConsumer = entry.getDeliveredConsumer(); + final ConsumerImpl deliveredConsumer = entry.getDeliveredConsumer(); object.put("deliveredTo", deliveredConsumer == null ? null : deliveredConsumer.getConsumerNumber()); ServerMessage message = entry.getMessage(); |
