From 6713bfc5ddc1ff6202dad0d950a252273f73f795 Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Wed, 5 Mar 2014 16:04:16 +0000 Subject: QPID-4000 , QPID-5601 : Improve conversion of reply-to between different protocols. Add functionality to the default exchange to understand AMQP 1.0 addresses. git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1574551 13f79535-47bb-0310-9956-ffa450edef68 --- .../store/berkeleydb/BDBMessageStoreTest.java | 2 +- .../qpid/server/exchange/AbstractExchange.java | 15 +- .../qpid/server/exchange/DefaultDestination.java | 26 ++- .../qpid/server/exchange/DirectExchange.java | 6 +- .../qpid/server/exchange/FanoutExchange.java | 4 +- .../qpid/server/exchange/HeadersExchange.java | 4 +- .../apache/qpid/server/exchange/TopicExchange.java | 10 +- .../qpid/server/message/MessageDestination.java | 10 +- .../apache/qpid/server/message/ServerMessage.java | 2 +- .../server/message/internal/InternalMessage.java | 9 +- .../apache/qpid/server/queue/AbstractQueue.java | 8 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 1 + .../qpid/server/exchange/FanoutExchangeTest.java | 10 +- .../qpid/server/exchange/HeadersExchangeTest.java | 2 +- .../qpid/server/exchange/TopicExchangeTest.java | 4 +- .../qpid/server/store/TestMessageMetaDataType.java | 2 +- .../apache/qpid/server/txn/MockServerMessage.java | 2 +- .../server/protocol/v0_10/ConsumerTarget_0_10.java | 2 +- .../v0_10/MessageConverter_Internal_to_v0_10.java | 15 +- .../protocol/v0_10/MessageConverter_v0_10.java | 3 +- .../protocol/v0_10/MessageTransferMessage.java | 2 +- .../qpid/server/protocol/v0_10/ServerSession.java | 7 +- .../qpid/server/protocol/v0_8/AMQChannel.java | 9 +- .../qpid/server/protocol/v0_8/AMQMessage.java | 2 +- .../v0_8/MessageConverter_Internal_to_v0_8.java | 8 +- .../v0_8/MessageConverter_v0_8_to_Internal.java | 210 ++++++++++++++++++++- .../server/protocol/v1_0/ExchangeDestination.java | 2 +- .../server/protocol/v1_0/MessageMetaData_1_0.java | 5 + .../qpid/server/protocol/v1_0/Message_1_0.java | 10 +- .../protocol/v1_0/NodeReceivingDestination.java | 2 +- .../v0_10_v1_0/MessageConverter_0_10_to_1_0.java | 2 +- .../v0_10_v1_0/MessageConverter_1_0_to_v0_10.java | 51 ++++- .../v0_8_v0_10/MessageConverter_0_8_to_0_10.java | 2 +- .../v0_8_v1_0/MessageConverter_0_8_to_1_0.java | 42 ++++- .../server/management/amqp/ManagementNode.java | 16 +- .../org/apache/qpid/client/AMQDestination.java | 5 + .../java/org/apache/qpid/client/AMQSession.java | 6 +- .../qpid/client/AMQUndefinedDestination.java | 6 + .../apache/qpid/client/BasicMessageProducer.java | 4 +- .../client/message/AMQMessageDelegate_0_8.java | 99 ++++++++-- .../apache/qpid/server/store/MessageStoreTest.java | 3 +- 41 files changed, 515 insertions(+), 115 deletions(-) (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java index eb3b665242..bd0411619e 100644 --- a/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java +++ b/qpid/java/bdbstore/systests/src/main/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreTest.java @@ -585,7 +585,7 @@ public class BDBMessageStoreTest extends MessageStoreTest _messageId = messageId; } - public String getRoutingKey() + public String getInitialRoutingAddress() { return null; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java index 2a688f497a..e01f4b7db9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/AbstractExchange.java @@ -423,11 +423,12 @@ public abstract class AbstractExchange> final List route(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties) { _receivedMessageCount.incrementAndGet(); _receivedMessageSize.addAndGet(message.getSize()); - List queues = doRoute(message, instanceProperties); + List queues = doRoute(message, routingAddress, instanceProperties); List allQueues = queues; boolean deletedQueues = false; @@ -464,18 +465,19 @@ public abstract class AbstractExchange> } public final > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { - List queues = route(message, instanceProperties); + List queues = route(message, routingAddress, instanceProperties); if(queues == null || queues.isEmpty()) { ExchangeImpl altExchange = getAlternateExchange(); if(altExchange != null) { - return altExchange.send(message, instanceProperties, txn, postEnqueueAction); + return altExchange.send(message, routingAddress, instanceProperties, txn, postEnqueueAction); } else { @@ -515,6 +517,7 @@ public abstract class AbstractExchange> } protected abstract List doRoute(final ServerMessage message, + final String routingAddress, final InstanceProperties instanceProperties); @Override diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java index f59049d276..123a4f0a63 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DefaultDestination.java @@ -50,13 +50,31 @@ public class DefaultDestination implements MessageDestination public final > int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { - final AMQQueue q = _virtualHost.getQueue(message.getRoutingKey()); + final AMQQueue q = _virtualHost.getQueue(routingAddress); if(q == null) { + if(routingAddress.contains("/") && !routingAddress.startsWith("/")) + { + String[] parts = routingAddress.split("/",2); + ExchangeImpl exchange = _virtualHost.getExchange(parts[0]); + if(exchange != null) + { + return exchange.send(message, parts[1], instanceProperties, txn, postEnqueueAction); + } + } + else if(!routingAddress.contains("/")) + { + ExchangeImpl exchange = _virtualHost.getExchange(routingAddress); + if(exchange != null) + { + return exchange.send(message, "", instanceProperties, txn, postEnqueueAction); + } + } return 0; } else diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java index 0e8cfb72a1..a67cacf821 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/DirectExchange.java @@ -143,11 +143,11 @@ public class DirectExchange extends AbstractExchange } @Override - public List doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public List doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey(); - BindingSet bindings = _bindingsByKey.get(routingKey == null ? "" : routingKey); if(bindings != null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java index c7f81f1d15..b7810e8112 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/FanoutExchange.java @@ -79,7 +79,9 @@ public class FanoutExchange extends AbstractExchange } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { for(BindingImpl b : getBindings()) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java index 60df38af55..9d3ce0a415 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/HeadersExchange.java @@ -93,7 +93,9 @@ public class HeadersExchange extends AbstractExchange } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingKey, + final InstanceProperties instanceProperties) { if (_logger.isDebugEnabled()) { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java index e7236bdf3e..db73e842b8 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/exchange/TopicExchange.java @@ -157,12 +157,14 @@ public class TopicExchange extends AbstractExchange } @Override - public ArrayList doRoute(ServerMessage payload, final InstanceProperties instanceProperties) + public ArrayList doRoute(ServerMessage payload, + final String routingAddress, + final InstanceProperties instanceProperties) { - final String routingKey = payload.getRoutingKey() == null + final String routingKey = routingAddress == null ? "" - : payload.getRoutingKey(); + : routingAddress; final Collection matchedQueues = getMatchedQueues(Filterable.Factory.newInstance(payload,instanceProperties), routingKey); @@ -181,7 +183,7 @@ public class TopicExchange extends AbstractExchange if(queues == null || queues.isEmpty()) { - _logger.info("Message routing key: " + payload.getRoutingKey() + " No routes."); + _logger.info("Message routing key: " + routingAddress + " No routes."); } return queues; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java index 155f209ffb..1913f11ae1 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/MessageDestination.java @@ -32,14 +32,18 @@ public interface MessageDestination extends MessageNode /** * Routes a message + * + * * @param message the message to be routed + * @param routingAddress * @param instanceProperties the instance properties * @param txn the transaction to enqueue within * @param postEnqueueAction action to perform on the result of every enqueue (may be null) * @return the number of queues in which the message was enqueued performed */ > int send(M message, - InstanceProperties instanceProperties, - ServerTransaction txn, - Action postEnqueueAction); + final String routingAddress, + InstanceProperties instanceProperties, + ServerTransaction txn, + Action postEnqueueAction); } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java index 28491edaba..8c35af8be4 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/ServerMessage.java @@ -27,7 +27,7 @@ import java.nio.ByteBuffer; public interface ServerMessage extends EnqueueableMessage, MessageContentSource { - String getRoutingKey(); + String getInitialRoutingAddress(); AMQMessageHeader getMessageHeader(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java index 6375cfb07d..fdc2fa90a5 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/internal/InternalMessage.java @@ -44,6 +44,7 @@ public class InternalMessage extends AbstractServerMessageImpl handle, @@ -80,9 +81,9 @@ public class InternalMessage extends AbstractServerMessageImpl> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { txn.enqueue(this,message, new ServerTransaction.Action() { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 9814431beb..91148b1dc0 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -377,6 +377,7 @@ public abstract class QueueEntryImpl implements QueueEntry if (alternateExchange != null) { enqueues = alternateExchange.send(getMessage(), + getMessage().getInitialRoutingAddress(), getInstanceProperties(), txn, action); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java index be0704aeaa..fa75d41810 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/FanoutExchangeTest.java @@ -127,7 +127,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, null); - List result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + List result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -136,7 +136,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key2",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = True")); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -144,14 +144,14 @@ public class FanoutExchangeTest extends TestCase _exchange.deleteBinding("key",queue2); - result = _exchange.route(mockMessage(true),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(true), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to queue1 only", 1, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); @@ -160,7 +160,7 @@ public class FanoutExchangeTest extends TestCase _exchange.addBinding("key",queue2, Collections.singletonMap(AMQPFilterTypes.JMS_SELECTOR.toString(),(Object)"select = False")); - result = _exchange.route(mockMessage(false),InstanceProperties.EMPTY); + result = _exchange.route(mockMessage(false), "", InstanceProperties.EMPTY); assertEquals("Expected message to be routed to both queues", 2, result.size()); assertTrue("Expected queue1 to be routed to", result.contains(queue1)); assertTrue("Expected queue2 to be routed to", result.contains(queue2)); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java index e4e07813c7..76752de5d0 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java @@ -73,7 +73,7 @@ public class HeadersExchangeTest extends TestCase protected void routeAndTest(ServerMessage msg, AMQQueue... expected) throws Exception { - List results = _exchange.route(msg, InstanceProperties.EMPTY); + List results = _exchange.route(msg, "", InstanceProperties.EMPTY); List unexpected = new ArrayList(results); unexpected.removeAll(Arrays.asList(expected)); assertTrue("Message delivered to unexpected queues: " + unexpected, unexpected.isEmpty()); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java index 1c191b7b2e..21aa171551 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/exchange/TopicExchangeTest.java @@ -324,8 +324,8 @@ public class TopicExchangeTest extends QpidTestCase private int routeMessage(String routingKey, long messageNumber) { ServerMessage message = mock(ServerMessage.class); - when(message.getRoutingKey()).thenReturn(routingKey); - List queues = _exchange.route(message, InstanceProperties.EMPTY); + when(message.getInitialRoutingAddress()).thenReturn(routingKey); + List queues = _exchange.route(message, routingKey, InstanceProperties.EMPTY); MessageReference ref = mock(MessageReference.class); when(ref.getMessage()).thenReturn(message); when(message.newReference()).thenReturn(ref); diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java index bd43100cd2..5622383f3f 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMessageMetaDataType.java @@ -119,7 +119,7 @@ public class TestMessageMetaDataType implements MessageMetaDataType { @@ -123,7 +114,7 @@ public class MessageConverter_Internal_to_v0_10 implements MessageConverter> } }; - int enqueues = _currentMessage.getDestination().send(amqMessage, instanceProperties, _transaction, - immediate ? _immediateAction : _capacityCheckAction); + int enqueues = _currentMessage.getDestination().send(amqMessage, + amqMessage.getInitialRoutingAddress(), + instanceProperties, _transaction, + immediate ? _immediateAction : _capacityCheckAction + ); if(enqueues == 0) { handleUnroutableMessage(amqMessage); @@ -1574,7 +1577,7 @@ public class AMQChannel> if (altExchange == null) { _logger.debug("No alternate exchange configured for queue, must discard the message as unable to DLQ: delivery tag: " + deliveryTag); - _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getRoutingKey())); + _actor.message(_logSubject, ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), queue.getName(), msg.getInitialRoutingAddress())); } else diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 833f5fb06f..0ed63daf7c 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -71,7 +71,7 @@ public class AMQMessage extends AbstractServerMessageImpl headerProps = new LinkedHashMap(); for(String headerName : serverMsg.getMessageHeader().getHeaderNames()) @@ -184,6 +185,7 @@ public class MessageConverter_Internal_to_v0_8 implements MessageConverter { @@ -58,9 +65,210 @@ public class MessageConverter_v0_8_to_Internal implements MessageConverter names) + { + return _delegate.containsHeaders(names); + } + + @Override + public boolean containsHeader(final String name) + { + return _delegate.containsHeader(name); + } + + @Override + public Collection getHeaderNames() + { + return _delegate.getHeaderNames(); + } + } + + private static Object convertMessageBody(String mimeType, byte[] data) { if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java index d83665ad39..fc2c0d93d0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java @@ -76,7 +76,7 @@ public class ExchangeDestination implements ReceivingDestination, SendingDestina return null; }}; - int enqueues = _exchange.send(message, instanceProperties, txn, null); + int enqueues = _exchange.send(message, message.getInitialRoutingAddress(), instanceProperties, txn, null); return enqueues == 0 ? REJECTED : ACCEPTED; diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index d5a349304c..4540308f61 100755 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -563,6 +563,11 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData { return _properties == null ? null : _properties.getTo(); } + + public Map getHeadersAsMap() + { + return new HashMap(_appProperties); + } } } diff --git a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index 66094f52f0..36796851e0 100644 --- a/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -69,7 +69,7 @@ public class Message_1_0 extends AbstractServerMessageImpl convertToStoredMessage(final Message_1_0 serverMsg) + private StoredMessage convertToStoredMessage(final Message_1_0 serverMsg, + final VirtualHost vhost) { Object bodyObject = MessageConverter_from_1_0.convertBodyToObject(serverMsg); final byte[] messageContent = MessageConverter_from_1_0.convertToBody(bodyObject); final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg, + vhost, MessageConverter_from_1_0.getBodyMimeType(bodyObject), messageContent.length); @@ -119,25 +123,54 @@ public class MessageConverter_1_0_to_v0_10 implements MessageConverter { @@ -102,9 +104,45 @@ public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0> int send(final M message, - final InstanceProperties instanceProperties, - final ServerTransaction txn, - final Action postEnqueueAction) + final String routingAddress, + final InstanceProperties instanceProperties, + final ServerTransaction txn, + final Action postEnqueueAction) { @SuppressWarnings("unchecked") @@ -361,11 +363,19 @@ class ManagementNode implements MessageSource, MessageDestination ManagementNodeConsumer consumer = _consumers.get(message.getMessageHeader().getReplyTo()); + response.setInitialRoutingAddress(message.getMessageHeader().getReplyTo()); if(consumer != null) { // TODO - check same owner consumer.send(response); } + else + { + _virtualHost.getDefaultDestination().send(response, + message.getMessageHeader().getReplyTo(), InstanceProperties.EMPTY, + new AutoCommitTransaction(_virtualHost.getMessageStore()), + null); + } // TODO - route to a queue } diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java index 91c23ff384..6c421a9610 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQDestination.java @@ -112,6 +112,11 @@ public abstract class AMQDestination implements Destination, Referenceable _name = name; } + public boolean neverDeclare() + { + return false; + } + // ----- Fields required to support new address syntax ------- public enum DestSyntax { diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 8224c77ba9..29c2a3b279 100644 --- a/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -2864,16 +2864,16 @@ public abstract class AMQSession