summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java12
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java10
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java36
-rw-r--r--java/test-profiles/python_tests/Java010PythonExcludes6
4 files changed, 47 insertions, 17 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index fea5303e23..6c9e918324 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -417,11 +417,19 @@ public abstract class QueueEntryImpl implements QueueEntry
if (alternateExchange != null)
{
- final List<? extends BaseQueue> rerouteQueues = alternateExchange.route(new InboundMessageAdapter(this));
+ InboundMessageAdapter inboundMessageAdapter = new InboundMessageAdapter(this);
+ List<? extends BaseQueue> queues = alternateExchange.route(inboundMessageAdapter);
final ServerMessage message = getMessage();
- if (rerouteQueues != null && rerouteQueues.size() != 0)
+ if ((queues == null || queues.size() == 0) && alternateExchange.getAlternateExchange() != null)
{
+ queues = alternateExchange.getAlternateExchange().route(inboundMessageAdapter);
+ }
+
+
+ if (queues != null && queues.size() != 0)
+ {
+ final List<? extends BaseQueue> rerouteQueues = queues;
ServerTransaction txn = new LocalTransaction(getQueue().getVirtualHost().getMessageStore());
txn.enqueue(rerouteQueues, message, new ServerTransaction.Action()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index f43de76251..7d0eb0c838 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -1515,10 +1515,16 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener, Mes
for(final QueueEntry entry : entries)
{
adapter.setEntry(entry);
- final List<? extends BaseQueue> rerouteQueues = _alternateExchange.route(adapter);
+ List<? extends BaseQueue> queues = _alternateExchange.route(adapter);
+ if((queues == null || queues.size() == 0) && _alternateExchange.getAlternateExchange() != null)
+ {
+ queues = _alternateExchange.getAlternateExchange().route(adapter);
+ }
+
final ServerMessage message = entry.getMessage();
- if(rerouteQueues != null && rerouteQueues.size() != 0)
+ if(queues != null && queues.size() != 0)
{
+ final List<? extends BaseQueue> rerouteQueues = queues;
txn.enqueue(rerouteQueues, entry.getMessage(),
new ServerTransaction.Action()
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index ce4153a8ff..4648a53b40 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -505,19 +505,40 @@ public class ServerSessionDelegate extends SessionDelegate
method.getAutoDelete());
String alternateExchangeName = method.getAlternateExchange();
+ boolean validAlternate;
if(alternateExchangeName != null && alternateExchangeName.length() != 0)
{
Exchange alternate = getExchange(session, alternateExchangeName);
- exchange.setAlternateExchange(alternate);
+ if(alternate == null)
+ {
+ validAlternate = false;
+ }
+ else
+ {
+ exchange.setAlternateExchange(alternate);
+ validAlternate = true;
+ }
}
-
- if (exchange.isDurable())
+ else
{
- DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
- store.createExchange(exchange);
+ validAlternate = true;
}
- exchangeRegistry.registerExchange(exchange);
+ if(validAlternate)
+ {
+ if (exchange.isDurable())
+ {
+ DurableConfigurationStore store = virtualHost.getDurableConfigurationStore();
+ store.createExchange(exchange);
+ }
+
+ exchangeRegistry.registerExchange(exchange);
+ }
+ else
+ {
+ exception(session, method, ExecutionErrorCode.NOT_FOUND,
+ "Unknown alternate exchange " + alternateExchangeName);
+ }
}
catch(AMQUnknownExchangeType e)
{
@@ -539,7 +560,8 @@ public class ServerSessionDelegate extends SessionDelegate
+ " to " + method.getType() +".");
}
else if(method.hasAlternateExchange()
- && !(method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
+ && (exchange.getAlternateExchange() == null ||
+ !method.getAlternateExchange().equals(exchange.getAlternateExchange().getName())))
{
exception(session, method, ExecutionErrorCode.NOT_ALLOWED,
"Attempt to change alternate exchange of: " + exchangeName
diff --git a/java/test-profiles/python_tests/Java010PythonExcludes b/java/test-profiles/python_tests/Java010PythonExcludes
index 11ddc1cda9..e6df44ab15 100644
--- a/java/test-profiles/python_tests/Java010PythonExcludes
+++ b/java/test-profiles/python_tests/Java010PythonExcludes
@@ -63,12 +63,6 @@ qpid_tests.broker_0_10.message.MessageTests.test_window_flow_bytes
#QPID-3605 Durable subscriber with no-local true receives messages on re-connection
qpid_tests.broker_0_10.message.MessageTests.test_no_local_awkward
-#QPID-3595 Alternate Exchanges support requires work to be spec compliant.
-qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_queue_delete_no_match
-qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_reject_no_match
-qpid_tests.broker_0_10.alternate_exchange.AlternateExchangeTests.test_add_alternate_to_exchange
-qpid_tests.broker_0_10.new_api.GeneralTests.test_qpid_3481_acquired_to_alt_exchange
-
#QPID-3599 Tests fail due to differences in expected message Redelivered status
qpid.tests.messaging.endpoints.SessionTests.testCommitAck
qpid.tests.messaging.endpoints.SessionTests.testRelease