summaryrefslogtreecommitdiff
path: root/qpid/java/broker-plugins
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-04-18 13:37:32 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-04-18 13:37:32 +0000
commitef4adc559cc4b81a0c681807986c62fc0b9a13e4 (patch)
treea66bb7845d9074bb79253f11bd5636c9a30e5324 /qpid/java/broker-plugins
parent36c6512134a729f2f7abb1fa6469a63b743dad1b (diff)
downloadqpid-python-ef4adc559cc4b81a0c681807986c62fc0b9a13e4.tar.gz
QPID-5710 : [Java Broker] Use common creation/recovery mechanism for Queues
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1588468 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-plugins')
-rw-r--r--qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java27
-rw-r--r--qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java4
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java14
-rw-r--r--qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java9
4 files changed, 36 insertions, 18 deletions
diff --git a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
index a3fabf076c..d73d019000 100644
--- a/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
+++ b/qpid/java/broker-plugins/amqp-0-10-protocol/src/main/java/org/apache/qpid/server/protocol/v0_10/ConsumerTarget_0_10.java
@@ -20,28 +20,37 @@
*/
package org.apache.qpid.server.protocol.v0_10;
+import java.util.Collections;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.logging.EventLogger;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.message.MessageInstance;
import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.plugin.MessageConverter;
import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.store.TransactionLogResource;
-import org.apache.qpid.server.consumer.AbstractConsumerTarget;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
import org.apache.qpid.server.util.Action;
import org.apache.qpid.server.util.ConnectionScopedRuntimeException;
-import org.apache.qpid.transport.*;
-
-import java.util.Collections;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageAcceptMode;
+import org.apache.qpid.transport.MessageAcquireMode;
+import org.apache.qpid.transport.MessageCreditUnit;
+import org.apache.qpid.transport.MessageFlowMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.MessageTransfer;
+import org.apache.qpid.transport.Method;
+import org.apache.qpid.transport.Option;
public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowCreditManager.FlowCreditManagerListener
{
@@ -405,7 +414,7 @@ public class ConsumerTarget_0_10 extends AbstractConsumerTarget implements FlowC
if(owningResource instanceof AMQQueue)
{
final AMQQueue queue = (AMQQueue)owningResource;
- final ExchangeImpl alternateExchange = queue.getAlternateExchange();
+ final Exchange alternateExchange = queue.getAlternateExchange();
if(alternateExchange != null)
{
diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
index 70094ea7c7..1f108ec3e9 100644
--- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
+++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java
@@ -63,7 +63,6 @@ import org.apache.qpid.server.TransactionTimeoutHelper.CloseAction;
import org.apache.qpid.server.configuration.BrokerProperties;
import org.apache.qpid.server.connection.SessionPrincipal;
import org.apache.qpid.server.consumer.ConsumerImpl;
-import org.apache.qpid.server.exchange.ExchangeImpl;
import org.apache.qpid.server.filter.AMQInvalidArgumentException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
@@ -86,6 +85,7 @@ import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.model.ConfigurationChangeListener;
import org.apache.qpid.server.model.ConfiguredObject;
import org.apache.qpid.server.model.Consumer;
+import org.apache.qpid.server.model.Exchange;
import org.apache.qpid.server.model.Session;
import org.apache.qpid.server.model.State;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -1616,7 +1616,7 @@ public class AMQChannel<T extends AMQProtocolSession<T>>
{
final AMQQueue queue = (AMQQueue) owningResource;
- final ExchangeImpl altExchange = queue.getAlternateExchange();
+ final Exchange altExchange = queue.getAlternateExchange();
if (altExchange == null)
{
diff --git a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
index e95ed9a383..a9fc160618 100644
--- a/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
+++ b/qpid/java/broker-plugins/management-jmx/src/main/java/org/apache/qpid/server/jmx/mbeans/QueueMBean.java
@@ -24,6 +24,7 @@ package org.apache.qpid.server.jmx.mbeans;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
import javax.management.JMException;
@@ -294,14 +295,19 @@ public class QueueMBean extends AMQManagedObject implements ManagedQueue, QueueN
{
if (exchangeName == null || "".equals(exchangeName))
{
- _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), null);
+ _queue.setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null));
}
else
{
- VirtualHost<?,?,?> virtualHost = _queue.getParent(VirtualHost.class);
- Exchange exchange = MBeanUtils.findExchangeFromExchangeName(virtualHost, exchangeName);
+ try
+ {
- _queue.setAttribute(Queue.ALTERNATE_EXCHANGE, getAlternateExchange(), exchange);
+ _queue.setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, exchangeName));
+ }
+ catch (IllegalArgumentException e)
+ {
+ throw new OperationsException("No such exchange \""+exchangeName+"\"");
+ }
}
}
diff --git a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
index a81de49fb9..6a49b7c4ed 100644
--- a/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
+++ b/qpid/java/broker-plugins/management-jmx/src/test/java/org/apache/qpid/server/jmx/mbeans/QueueMBeanTest.java
@@ -19,8 +19,10 @@
package org.apache.qpid.server.jmx.mbeans;
import static org.mockito.Matchers.argThat;
+import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Mockito.doAnswer;
+import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -320,7 +322,7 @@ public class QueueMBeanTest extends QpidTestCase
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
_queueMBean.setAlternateExchange("exchange2");
- verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, mockExchange2);
+ verify(_mockQueue).setAttributes(Collections.<String,Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "exchange2"));
}
public void testSetAlternateExchangeWithUnknownExchangeName() throws Exception
@@ -331,7 +333,8 @@ public class QueueMBeanTest extends QpidTestCase
VirtualHost mockVirtualHost = mock(VirtualHost.class);
when(mockVirtualHost.getExchanges()).thenReturn(Collections.singletonList(mockExchange));
when(_mockQueue.getParent(VirtualHost.class)).thenReturn(mockVirtualHost);
-
+ doThrow(new IllegalArgumentException()).when(_mockQueue).setAttributes(
+ eq(Collections.<String, Object>singletonMap(Queue.ALTERNATE_EXCHANGE, "notknown")));
try
{
_queueMBean.setAlternateExchange("notknown");
@@ -346,7 +349,7 @@ public class QueueMBeanTest extends QpidTestCase
public void testRemoveAlternateExchange() throws Exception
{
_queueMBean.setAlternateExchange("");
- verify(_mockQueue).setAttribute(Queue.ALTERNATE_EXCHANGE, null, null);
+ verify(_mockQueue).setAttributes(Collections.singletonMap(Queue.ALTERNATE_EXCHANGE, null));
}
/********** Operations **********/