diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 13:37:32 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-04-18 13:37:32 +0000 |
| commit | ef4adc559cc4b81a0c681807986c62fc0b9a13e4 (patch) | |
| tree | a66bb7845d9074bb79253f11bd5636c9a30e5324 /qpid/java/broker-plugins | |
| parent | 36c6512134a729f2f7abb1fa6469a63b743dad1b (diff) | |
| download | qpid-python-ef4adc559cc4b81a0c681807986c62fc0b9a13e4.tar.gz | |
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
4 files changed, 36 insertions, 18 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java index a3fabf076c..d73d019000 100644 --- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java +++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java @@ -20,28 +20,37 @@ */ package org.apache.qpid.server.protocol.v0_10; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.logging.EventLogger; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.message.MessageInstance; import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.plugin.MessageConverter; import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.TransactionLogResource; -import org.apache.qpid.server.consumer.AbstractConsumerTarget; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; -import org.apache.qpid.transport.*; - -import java.util.Collections; -import java.util.Map; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageAcceptMode; +import org.apache.qpid.transport.MessageAcquireMode; +import org.apache.qpid.transport.MessageCreditUnit; +import org.apache.qpid.transport.MessageFlowMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.MessageTransfer; +import org.apache.qpid.transport.Method; +import org.apache.qpid.transport.Option; public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener { @@ -405,7 +414,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC if(owningResource instanceof AMQQueue) { final AMQQueue queue = (AMQQueue)owningResource; - final ExchangeImpl alternateExchange = queue.getAlternateExchange(); + final Exchange alternateExchange = queue.getAlternateExchange(); if(alternateExchange != null) { diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 70094ea7c7..1f108ec3e9 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -63,7 +63,6 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction; import org.apache.qpid.server.configuration.BrokerProperties; import org.apache.qpid.server.connection.SessionPrincipal; import org.apache.qpid.server.consumer.ConsumerImpl; -import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.filter.AMQInvalidArgumentException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.filter.FilterManagerFactory; @@ -86,6 +85,7 @@ import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.ConfigurationChangeListener; import org.apache.qpid.server.model.ConfiguredObject; import org.apache.qpid.server.model.Consumer; +import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.model.Session; import org.apache.qpid.server.model.State; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -1616,7 +1616,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>> { final AMQQueue queue = (AMQQueue) owningResource; - final ExchangeImpl altExchange = queue.getAlternateExchange(); + final Exchange altExchange = queue.getAlternateExchange(); if (altExchange == null) { diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java index e95ed9a383..a9fc160618 100644 --- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java +++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java @@ -24,6 +24,7 @@ package org.apache.qpid.server.jmx.mbeans; import java.io.IOException; import java.nio.ByteBuffer; import java.util.ArrayList; +import java.util.Collections; import java.util.List; import java.util.concurrent.atomic.AtomicLong; import javax.management.JMException; @@ -294,14 +295,19 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN { if (exchangeName == null || "".equals(exchangeName)) { - _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), null); + _queue.setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null)); } else { - VirtualHost<?,?,?> virtualHost = _queue.getParent(VirtualHost.class); - Exchange exchange = MBeanUtils.findExchangeFromExchangeName(virtualHost, exchangeName); + try + { - _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), exchange); + _queue.setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, exchangeName)); + } + catch (IllegalArgumentException e) + { + throw new OperationsException("No such exchange \""+exchangeName+"\""); + } } } diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java index a81de49fb9..6a49b7c4ed 100644 --- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java +++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java @@ -19,8 +19,10 @@ package org.apache.qpid.server.jmx.mbeans; import static org.mockito.Matchers.argThat; +import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Mockito.doAnswer; +import static org.mockito.Mockito.doThrow; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; @@ -320,7 +322,7 @@ public class QueueMBeanTest extends QpidTestCase when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); _queueMBean.setAlternateExchange("exchange2"); - verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, mockExchange2); + verify(_mockQueue).setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "exchange2")); } public void testSetAlternateExchangeWithUnknownExchangeName() throws Exception @@ -331,7 +333,8 @@ public class QueueMBeanTest extends QpidTestCase VirtualHost mockVirtualHost = mock(VirtualHost.class); when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange)); when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost); - + doThrow(new IllegalArgumentException()).when(_mockQueue).setAttributes( + eq(Collections.<String, Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "notknown"))); try { _queueMBean.setAlternateExchange("notknown"); @@ -346,7 +349,7 @@ public class QueueMBeanTest extends QpidTestCase public void testRemoveAlternateExchange() throws Exception { _queueMBean.setAlternateExchange(""); - verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null); + verify(_mockQueue).setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null)); } /********** Operations **********/ |
