summaryrefslogtreecommitdiff
path: root/qpid/java
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-15 14:26:21 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-15 14:26:21 +0000
commit4ad719f82cadddaef7e6693b43c3e767f3eb14cd (patch)
treee8fc825075bbed8e36a58e57d5a8abfaa74ec788 /qpid/java
parentad575a78103b300adaa6f616b6ed3b60dedf0f7e (diff)
downloadqpid-python-4ad719f82cadddaef7e6693b43c3e767f3eb14cd.tar.gz
QPID-4659 : [Java Broker] make message fomat conversions pluggable for different protcol versions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503267 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java32
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java59
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java140
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java271
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java226
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java125
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java138
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java225
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java2
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java24
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java81
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java245
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java31
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter23
-rw-r--r--qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType (renamed from qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType)0
15 files changed, 1376 insertions, 246 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java
new file mode 100644
index 0000000000..cf3860ba92
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.plugin;
+
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+
+public interface MessageConverter<M extends ServerMessage, N extends ServerMessage> extends Pluggable
+{
+ Class<M> getInputClass();
+ Class<N> getOutputClass();
+
+ N convert(M message, VirtualHost vhost);
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java
new file mode 100644
index 0000000000..81e5af179d
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java
@@ -0,0 +1,59 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol;
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+
+public class MessageConverterRegistry
+{
+ private static Map<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>> _converters =
+ new HashMap<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>>();
+
+ static
+ {
+
+ for(MessageConverter<? extends ServerMessage, ? extends ServerMessage> converter : (new QpidServiceLoader<MessageConverter>()).instancesOf(MessageConverter.class))
+ {
+ Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(converter.getInputClass());
+ if(map == null)
+ {
+ map = new HashMap<Class<? extends ServerMessage>, MessageConverter>();
+ _converters.put(converter.getInputClass(), map);
+ }
+ map.put(converter.getOutputClass(),converter);
+ }
+ }
+
+ public static <M extends ServerMessage,N extends ServerMessage> MessageConverter<M, N> getConverter(Class<M> from, Class<N> to)
+ {
+ Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(from);
+ if(map == null)
+ {
+ map = _converters.get(ServerMessage.class);
+ }
+ return map == null ? null : map.get(to);
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
new file mode 100644
index 0000000000..a70bd4b243
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.server.protocol.converter.v0_10_v1_0;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageProperties;
+
+public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<MessageTransferMessage>
+{
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+
+ @Override
+ protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage,
+ SectionEncoder sectionEncoder)
+ {
+ List<Section> sections = new ArrayList<Section>(3);
+ final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties();
+ final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties();
+
+ Header header = new Header();
+ if(deliveryProps != null)
+ {
+ header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT);
+ if(deliveryProps.hasPriority())
+ {
+ header.setPriority(UnsignedByte.valueOf((byte) deliveryProps.getPriority().getValue()));
+ }
+ if(deliveryProps.hasTtl())
+ {
+ header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl()));
+ }
+ sections.add(header);
+ }
+
+ Properties props = new Properties();
+
+ /*
+ TODO: the current properties are not currently set:
+
+ absoluteExpiryTime
+ creationTime
+ groupId
+ groupSequence
+ replyToGroupId
+ to
+ */
+
+ if(msgProps != null)
+ {
+ if(msgProps.hasContentEncoding())
+ {
+ props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding()));
+ }
+
+ if(msgProps.hasCorrelationId())
+ {
+ props.setCorrelationId(msgProps.getCorrelationId());
+ }
+
+ if(msgProps.hasMessageId())
+ {
+ props.setMessageId(msgProps.getMessageId());
+ }
+ if(msgProps.hasReplyTo())
+ {
+ props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey());
+ }
+ if(msgProps.hasContentType())
+ {
+ props.setContentType(Symbol.valueOf(msgProps.getContentType()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+ {
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+ }
+
+ props.setSubject(serverMessage.getRoutingKey());
+
+ if(msgProps.hasUserId())
+ {
+ props.setUserId(new Binary(msgProps.getUserId()));
+ }
+
+ sections.add(props);
+
+ if(msgProps.getApplicationHeaders() != null)
+ {
+ sections.add(new ApplicationProperties(msgProps.getApplicationHeaders()));
+ }
+ }
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-10 to v1-0";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
new file mode 100644
index 0000000000..c14896079f
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java
@@ -0,0 +1,271 @@
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.nio.ByteBuffer;
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.qpid.AMQPInvalidClassException;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryMode;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+
+public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage>
+{
+ private static final int BASIC_CLASS_ID = 60;
+
+ public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage,
+ VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = new BasicContentHeaderProperties();
+
+ Header header = messageTransferMessage.getHeader();
+ DeliveryProperties deliveryProps = header.getDeliveryProperties();
+ MessageProperties messageProps = header.getMessageProperties();
+
+ if(deliveryProps != null)
+ {
+ if(deliveryProps.hasDeliveryMode())
+ {
+ props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT
+ ? BasicContentHeaderProperties.PERSISTENT
+ : BasicContentHeaderProperties.NON_PERSISTENT));
+ }
+ if(deliveryProps.hasExpiration())
+ {
+ props.setExpiration(deliveryProps.getExpiration());
+ }
+ if(deliveryProps.hasPriority())
+ {
+ props.setPriority((byte) deliveryProps.getPriority().getValue());
+ }
+ if(deliveryProps.hasTimestamp())
+ {
+ props.setTimestamp(deliveryProps.getTimestamp());
+ }
+ }
+ if(messageProps != null)
+ {
+ if(messageProps.hasAppId())
+ {
+ props.setAppId(new AMQShortString(messageProps.getAppId()));
+ }
+ if(messageProps.hasContentType())
+ {
+ props.setContentType(messageProps.getContentType());
+ }
+ if(messageProps.hasCorrelationId())
+ {
+ props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId()));
+ }
+ if(messageProps.hasContentEncoding())
+ {
+ props.setEncoding(messageProps.getContentEncoding());
+ }
+ if(messageProps.hasMessageId())
+ {
+ props.setMessageId("ID:" + messageProps.getMessageId().toString());
+ }
+ if(messageProps.hasReplyTo())
+ {
+ ReplyTo replyTo = messageProps.getReplyTo();
+ String exchangeName = replyTo.getExchange();
+ String routingKey = replyTo.getRoutingKey();
+ if(exchangeName == null)
+ {
+ exchangeName = "";
+ }
+
+ Exchange exchange = vhost.getExchange(exchangeName);
+ String exchangeClass = exchange == null
+ ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString()
+ : exchange.getType().getName().asString();
+ props.setReplyTo(exchangeClass + "://" + exchangeName + "//?routingkey='" + (routingKey == null
+ ? ""
+ : routingKey + "'"));
+
+ }
+ if(messageProps.hasUserId())
+ {
+ props.setUserId(new AMQShortString(messageProps.getUserId()));
+ }
+
+ if(messageProps.hasApplicationHeaders())
+ {
+ Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders());
+ if(messageProps.getApplicationHeaders().containsKey("x-jms-type"))
+ {
+ props.setType(String.valueOf(appHeaders.remove("x-jms-type")));
+ }
+
+ FieldTable ft = new FieldTable();
+ for(Map.Entry<String, Object> entry : appHeaders.entrySet())
+ {
+ try
+ {
+ ft.put(new AMQShortString(entry.getKey()), entry.getValue());
+ }
+ catch (AMQPInvalidClassException e)
+ {
+ // TODO
+ // log here, but ignore - just can;t convert
+ }
+ }
+ props.setHeaders(ft);
+
+ }
+ }
+
+ return props;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getInputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public Class<AMQMessage> getOutputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new AMQMessage(convertToStoredMessage(message, vhost));
+ }
+
+ private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message,
+ VirtualHost vhost)
+ {
+ final MessageMetaData metaData = convertMetaData(message, vhost);
+ return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>()
+ {
+ @Override
+ public MessageMetaData getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return message.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return message.getContent(offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost)
+ {
+ return new MessageMetaData(convertPublishBody(message),
+ convertContentHeaderBody(message, vhost),
+ 1,
+ message.getArrivalTime());
+ }
+
+ private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost)
+ {
+ BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
+ chb.setBodySize(message.getSize());
+ return chb;
+ }
+
+ private MessagePublishInfo convertPublishBody(MessageTransferMessage message)
+ {
+ DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
+ final AMQShortString exchangeName = (delvProps == null || delvProps.getExchange() == null)
+ ? null
+ : new AMQShortString(delvProps.getExchange());
+ final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null)
+ ? null
+ : new AMQShortString(delvProps.getRoutingKey());
+ final boolean immediate = delvProps != null && delvProps.getImmediate();
+ final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable();
+
+ return new MessagePublishInfo()
+ {
+ @Override
+ public AMQShortString getExchange()
+ {
+ return exchangeName;
+ }
+
+ @Override
+ public void setExchange(AMQShortString exchange)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ @Override
+ public boolean isMandatory()
+ {
+ return mandatory;
+ }
+
+ @Override
+ public AMQShortString getRoutingKey()
+ {
+ return routingKey;
+ }
+ };
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-10 to v0-8";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
new file mode 100644
index 0000000000..e1e8fbd9d3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_8_v0_10;
+
+import java.net.URISyntaxException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
+import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+import org.apache.qpid.transport.ReplyTo;
+import org.apache.qpid.url.AMQBindingURL;
+
+public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessage, MessageTransferMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getOutputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost)
+ {
+ return new MessageTransferMessage(convertToStoredMessage(message_0_8), null);
+ }
+
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8)
+ {
+ final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8);
+ return new StoredMessage<MessageMetaData_0_10>()
+ {
+ @Override
+ public MessageMetaData_0_10 getMetaData()
+ {
+ return messageMetaData_0_10;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return message_0_8.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return message_0_8.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return message_0_8.getContent(offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8)
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ int size = (int) message_0_8.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ message_0_8.getContent(body, 0);
+ body.flip();
+
+ BasicContentHeaderProperties properties =
+ (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
+
+ final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
+ if(exchange != null)
+ {
+ deliveryProps.setExchange(exchange.toString());
+ }
+
+ deliveryProps.setExpiration(message_0_8.getExpiration());
+ deliveryProps.setImmediate(message_0_8.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
+ deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
+ deliveryProps.setTimestamp(properties.getTimestamp());
+
+ messageProps.setContentEncoding(properties.getEncodingAsString());
+ messageProps.setContentLength(size);
+ if(properties.getAppId() != null)
+ {
+ messageProps.setAppId(properties.getAppId().getBytes());
+ }
+ messageProps.setContentType(properties.getContentTypeAsString());
+ if(properties.getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+ }
+
+ if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ {
+ String origReplyToString = properties.getReplyTo().asString();
+ ReplyTo replyTo = new ReplyTo();
+ // if the string looks like a binding URL, then attempt to parse it...
+ try
+ {
+ AMQBindingURL burl = new AMQBindingURL(origReplyToString);
+ AMQShortString routingKey = burl.getRoutingKey();
+ if(routingKey != null)
+ {
+ replyTo.setRoutingKey(routingKey.asString());
+ }
+
+ AMQShortString exchangeName = burl.getExchangeName();
+ if(exchangeName != null)
+ {
+ replyTo.setExchange(exchangeName.asString());
+ }
+ }
+ catch (URISyntaxException e)
+ {
+ replyTo.setRoutingKey(origReplyToString);
+ }
+ messageProps.setReplyTo(replyTo);
+
+ }
+
+ if(properties.getMessageId() != null)
+ {
+ try
+ {
+ String messageIdAsString = properties.getMessageIdAsString();
+ if(messageIdAsString.startsWith("ID:"))
+ {
+ messageIdAsString = messageIdAsString.substring(3);
+ }
+ UUID uuid = UUID.fromString(messageIdAsString);
+ messageProps.setMessageId(uuid);
+ }
+ catch(IllegalArgumentException e)
+ {
+ // ignore - can't parse
+ }
+ }
+
+
+
+ if(properties.getUserId() != null)
+ {
+ messageProps.setUserId(properties.getUserId().getBytes());
+ }
+
+ FieldTable fieldTable = properties.getHeaders();
+
+ Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
+
+ if(properties.getType() != null)
+ {
+ appHeaders.put("x-jms-type", properties.getTypeAsString());
+ }
+
+
+ messageProps.setApplicationHeaders(appHeaders);
+
+ Header header = new Header(deliveryProps, messageProps, null);
+
+
+ return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime());
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to v0-10";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
new file mode 100644
index 0000000000..0d9d59ff56
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java
@@ -0,0 +1,125 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.converter.v0_8_v1_0;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.Symbol;
+import org.apache.qpid.amqp_1_0.type.UnsignedByte;
+import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
+import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties;
+import org.apache.qpid.amqp_1_0.type.messaging.Header;
+import org.apache.qpid.amqp_1_0.type.messaging.Properties;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0;
+import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0;
+
+public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage>
+{
+ @Override
+ public Class<AMQMessage> getInputClass()
+ {
+ return AMQMessage.class;
+ }
+
+ protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder)
+ {
+
+ List<Section> sections = new ArrayList<Section>(3);
+
+ Header header = new Header();
+
+ header.setDurable(serverMessage.isPersistent());
+
+ BasicContentHeaderProperties contentHeader =
+ (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties();
+
+ header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority()));
+ final long expiration = serverMessage.getExpiration();
+ final long arrivalTime = serverMessage.getArrivalTime();
+
+ if(expiration > arrivalTime)
+ {
+ header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime));
+ }
+ sections.add(header);
+
+
+ Properties props = new Properties();
+
+ /*
+ TODO: The following properties are not currently set:
+
+ creationTime
+ groupId
+ groupSequence
+ replyToGroupId
+ to
+ */
+
+ props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString()));
+
+ props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString()));
+
+ // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client
+ if(props.getContentType() == Symbol.valueOf("application/java-object-stream"))
+ {
+ props.setContentType(Symbol.valueOf("application/x-java-serialized-object"));
+ }
+
+ final AMQShortString correlationId = contentHeader.getCorrelationId();
+ if(correlationId != null)
+ {
+ props.setCorrelationId(new Binary(correlationId.getBytes()));
+ }
+
+ final AMQShortString messageId = contentHeader.getMessageId();
+ if(messageId != null)
+ {
+ props.setMessageId(new Binary(messageId.getBytes()));
+ }
+ props.setReplyTo(String.valueOf(contentHeader.getReplyTo()));
+
+ props.setSubject(serverMessage.getRoutingKey());
+ if(contentHeader.getUserId() != null)
+ {
+ props.setUserId(new Binary(contentHeader.getUserId().getBytes()));
+ }
+
+ sections.add(props);
+
+ sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders())));
+
+ return new MessageMetaData_1_0(sections, sectionEncoder);
+ }
+
+ @Override
+ public String getType()
+ {
+ return "v0-8 to v1-0";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
new file mode 100644
index 0000000000..c6ae0c6e47
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java
@@ -0,0 +1,138 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.transport.Header;
+import org.apache.qpid.transport.MessageDeliveryPriority;
+import org.apache.qpid.transport.MessageProperties;
+
+public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage>
+{
+ @Override
+ public Class<ServerMessage> getInputClass()
+ {
+ return ServerMessage.class;
+ }
+
+ @Override
+ public Class<MessageTransferMessage> getOutputClass()
+ {
+ return MessageTransferMessage.class;
+ }
+
+ @Override
+ public MessageTransferMessage convert(ServerMessage serverMsg, VirtualHost vhost)
+ {
+ return new MessageTransferMessage(convertToStoredMessage(serverMsg), null);
+ }
+
+ private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final ServerMessage serverMsg)
+ {
+ final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg);
+
+ return new StoredMessage<MessageMetaData_0_10>()
+ {
+ @Override
+ public MessageMetaData_0_10 getMetaData()
+ {
+ return messageMetaData_0_10;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMsg.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ return serverMsg.getContent(dst, offsetInMessage);
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ return serverMsg.getContent(offsetInMessage, size);
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void remove()
+ {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+
+ private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg)
+ {
+ DeliveryProperties deliveryProps = new DeliveryProperties();
+ MessageProperties messageProps = new MessageProperties();
+
+ int size = (int) serverMsg.getSize();
+ ByteBuffer body = ByteBuffer.allocate(size);
+ serverMsg.getContent(body, 0);
+ body.flip();
+
+
+ deliveryProps.setExpiration(serverMsg.getExpiration());
+ deliveryProps.setImmediate(serverMsg.isImmediate());
+ deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
+ deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
+ deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+
+ messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
+ messageProps.setContentLength(size);
+ messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
+ if(serverMsg.getMessageHeader().getCorrelationId() != null)
+ {
+ messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
+ }
+
+ Header header = new Header(deliveryProps, messageProps, null);
+ return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime());
+ }
+
+ @Override
+ public String getType()
+ {
+ return "Unknown to v0-10";
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
index 87d482c44c..c6bceb6ac7 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java
@@ -33,7 +33,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
-import org.apache.qpid.server.protocol.v0_8.AMQMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.queue.AMQQueue;
@@ -48,21 +49,16 @@ import org.apache.qpid.transport.Header;
import org.apache.qpid.transport.MessageAcceptMode;
import org.apache.qpid.transport.MessageAcquireMode;
import org.apache.qpid.transport.MessageCreditUnit;
-import org.apache.qpid.transport.MessageDeliveryPriority;
import org.apache.qpid.transport.MessageFlowMode;
import org.apache.qpid.transport.MessageProperties;
import org.apache.qpid.transport.MessageTransfer;
import org.apache.qpid.transport.Method;
import org.apache.qpid.transport.Option;
-import org.apache.qpid.transport.ReplyTo;
import org.apache.qpid.transport.Struct;
-import org.apache.qpid.url.AMQBindingURL;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT;
-import java.net.URISyntaxException;
-import java.nio.ByteBuffer;
import java.text.MessageFormat;
import java.util.Arrays;
import java.util.Collections;
@@ -209,10 +205,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
return false;
}
- if (_noLocal && entry.getMessage() instanceof MessageTransferMessage)
+ if (entry.getMessage() instanceof MessageTransferMessage)
{
- Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
- if (connectionRef != null && connectionRef == _session.getReference())
+ if(_noLocal)
+ {
+ Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference();
+ if (connectionRef != null && connectionRef == _session.getReference())
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ // no interest in messages we can't convert
+ if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null)
{
return false;
}
@@ -348,200 +355,72 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
DeliveryProperties deliveryProps;
MessageProperties messageProps = null;
+ MessageTransferMessage msg;
+
if(serverMsg instanceof MessageTransferMessage)
{
- MessageTransferMessage msg = (MessageTransferMessage) serverMsg;
- DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
- messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
-
- deliveryProps = new DeliveryProperties();
- if(origDeliveryProps != null)
- {
- if(origDeliveryProps.hasDeliveryMode())
- {
- deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
- }
- if(origDeliveryProps.hasExchange())
- {
- deliveryProps.setExchange(origDeliveryProps.getExchange());
- }
- if(origDeliveryProps.hasExpiration())
- {
- deliveryProps.setExpiration(origDeliveryProps.getExpiration());
- }
- if(origDeliveryProps.hasPriority())
- {
- deliveryProps.setPriority(origDeliveryProps.getPriority());
- }
- if(origDeliveryProps.hasRoutingKey())
- {
- deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
- }
- if(origDeliveryProps.hasTimestamp())
- {
- deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
- }
- if(origDeliveryProps.hasTtl())
- {
- deliveryProps.setTtl(origDeliveryProps.getTtl());
- }
-
+ msg = (MessageTransferMessage) serverMsg;
- }
-
- deliveryProps.setRedelivered(entry.isRedelivered());
-
- if(_trace != null && messageProps == null)
- {
- messageProps = new MessageProperties();
- }
-
- Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
-
-
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
}
- else if(serverMsg instanceof AMQMessage)
+ else
{
- AMQMessage message_0_8 = (AMQMessage) serverMsg;
- deliveryProps = new DeliveryProperties();
- messageProps = new MessageProperties();
+ MessageConverter converter =
+ MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class);
+
- int size = (int) message_0_8.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- message_0_8.getContent(body, 0);
- body.flip();
+ msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost());
+ }
+ DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties();
+ messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties();
- BasicContentHeaderProperties properties =
- (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties();
- final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange();
- if(exchange != null)
+ deliveryProps = new DeliveryProperties();
+ if(origDeliveryProps != null)
+ {
+ if(origDeliveryProps.hasDeliveryMode())
{
- deliveryProps.setExchange(exchange.toString());
+ deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode());
}
- deliveryProps.setExpiration(message_0_8.getExpiration());
- deliveryProps.setImmediate(message_0_8.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority()));
- deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setRoutingKey(message_0_8.getRoutingKey());
- deliveryProps.setTimestamp(properties.getTimestamp());
-
- messageProps.setContentEncoding(properties.getEncodingAsString());
- messageProps.setContentLength(size);
- if(properties.getAppId() != null)
+ if(origDeliveryProps.hasExchange())
{
- messageProps.setAppId(properties.getAppId().getBytes());
+ deliveryProps.setExchange(origDeliveryProps.getExchange());
}
- messageProps.setContentType(properties.getContentTypeAsString());
- if(properties.getCorrelationId() != null)
+ if(origDeliveryProps.hasExpiration())
{
- messageProps.setCorrelationId(properties.getCorrelationId().getBytes());
+ deliveryProps.setExpiration(origDeliveryProps.getExpiration());
}
-
- if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0)
+ if(origDeliveryProps.hasPriority())
{
- String origReplyToString = properties.getReplyTo().asString();
- ReplyTo replyTo = new ReplyTo();
- // if the string looks like a binding URL, then attempt to parse it...
- try
- {
- AMQBindingURL burl = new AMQBindingURL(origReplyToString);
- AMQShortString routingKey = burl.getRoutingKey();
- if(routingKey != null)
- {
- replyTo.setRoutingKey(routingKey.asString());
- }
-
- AMQShortString exchangeName = burl.getExchangeName();
- if(exchangeName != null)
- {
- replyTo.setExchange(exchangeName.asString());
- }
- }
- catch (URISyntaxException e)
- {
- replyTo.setRoutingKey(origReplyToString);
- }
- messageProps.setReplyTo(replyTo);
-
+ deliveryProps.setPriority(origDeliveryProps.getPriority());
}
-
- if(properties.getMessageId() != null)
+ if(origDeliveryProps.hasRoutingKey())
{
- try
- {
- String messageIdAsString = properties.getMessageIdAsString();
- if(messageIdAsString.startsWith("ID:"))
- {
- messageIdAsString = messageIdAsString.substring(3);
- }
- UUID uuid = UUID.fromString(messageIdAsString);
- messageProps.setMessageId(uuid);
- }
- catch(IllegalArgumentException e)
- {
- // ignore - can't parse
- }
+ deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey());
}
-
-
-
- if(properties.getUserId() != null)
+ if(origDeliveryProps.hasTimestamp())
{
- messageProps.setUserId(properties.getUserId().getBytes());
+ deliveryProps.setTimestamp(origDeliveryProps.getTimestamp());
}
-
- FieldTable fieldTable = properties.getHeaders();
-
- Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable);
-
- if(properties.getType() != null)
+ if(origDeliveryProps.hasTtl())
{
- appHeaders.put("x-jms-type", properties.getTypeAsString());
+ deliveryProps.setTtl(origDeliveryProps.getTtl());
}
- messageProps.setApplicationHeaders(appHeaders);
-
- Header header = new Header(deliveryProps, messageProps, null);
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
}
- else
- {
-
- deliveryProps = new DeliveryProperties();
- messageProps = new MessageProperties();
-
- int size = (int) serverMsg.getSize();
- ByteBuffer body = ByteBuffer.allocate(size);
- serverMsg.getContent(body, 0);
- body.flip();
+ deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setExpiration(serverMsg.getExpiration());
- deliveryProps.setImmediate(serverMsg.isImmediate());
- deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority()));
- deliveryProps.setRedelivered(entry.isRedelivered());
- deliveryProps.setRoutingKey(serverMsg.getRoutingKey());
- deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp());
+ if(_trace != null && messageProps == null)
+ {
+ messageProps = new MessageProperties();
+ }
- messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding());
- messageProps.setContentLength(size);
- messageProps.setContentType(serverMsg.getMessageHeader().getMimeType());
- if(serverMsg.getMessageHeader().getCorrelationId() != null)
- {
- messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes());
- }
+ Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties());
- // TODO - ReplyTo
- Header header = new Header(deliveryProps, messageProps, null);
- xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED)
- : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body);
- }
+ xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED)
+ : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody());
boolean excludeDueToFederation = false;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
index 3567a005d2..416a4da183 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java
@@ -152,7 +152,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData>
}
- public MessagePublishInfo getMessagePublishInfo() throws AMQException
+ public MessagePublishInfo getMessagePublishInfo()
{
return getMessageMetaData().getMessagePublishInfo();
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
index 23fc335dc7..ae8e019bd0 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java
@@ -35,6 +35,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.SubscriptionActor;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.protocol.AMQSessionModel;
@@ -482,16 +483,27 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
}
}
- if (_noLocal)
+ if(entry.getMessage() instanceof AMQMessage)
{
- AMQMessage message = (AMQMessage) entry.getMessage();
+ if (_noLocal)
+ {
+ AMQMessage message = (AMQMessage) entry.getMessage();
- final Object publisherReference = message.getConnectionIdentifier();
+ final Object publisherReference = message.getConnectionIdentifier();
- // We don't want local messages so check to see if message is one we sent
- Object localReference = getProtocolSession().getReference();
+ // We don't want local messages so check to see if message is one we sent
+ Object localReference = getProtocolSession().getReference();
- if(publisherReference != null && publisherReference.equals(localReference))
+ if(publisherReference != null && publisherReference.equals(localReference))
+ {
+ return false;
+ }
+ }
+ }
+ else
+ {
+ // No interest in messages we can't convert to AMQMessage
+ if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null)
{
return false;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
index 9e7d04e0fb..adba8e665a 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java
@@ -27,19 +27,19 @@ import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicCancelOkBody;
-import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicGetOkBody;
import org.apache.qpid.framing.BasicReturnBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.MethodRegistry;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
import org.apache.qpid.server.message.MessageContentSource;
-import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
-import org.apache.qpid.transport.DeliveryProperties;
import java.io.DataOutput;
import java.io.IOException;
@@ -67,33 +67,34 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
throws AMQException
{
- AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
- writeMessageDelivery(entry, channelId, deliverBody);
+ AMQMessage msg = convertToAMQMessage(entry);
+ AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag);
+ writeMessageDelivery(msg, channelId, deliverBody);
}
-
- private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
- throws AMQException
+ private AMQMessage convertToAMQMessage(QueueEntry entry)
{
- if(entry.getMessage() instanceof AMQMessage)
+ ServerMessage serverMessage = entry.getMessage();
+ if(serverMessage instanceof AMQMessage)
{
- return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ return (AMQMessage) serverMessage;
}
else
{
- final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost());
- ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID);
- chb.setBodySize(message.getSize());
- return chb;
+ return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost());
}
}
+ private <M extends ServerMessage> MessageConverter<M, AMQMessage> getMessageConverter(M message)
+ {
+ Class<M> clazz = (Class<M>) message.getClass();
+ return MessageConverterRegistry.getConverter(clazz, AMQMessage.class);
+ }
- private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody)
throws AMQException
{
- writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody);
}
private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
@@ -188,35 +189,23 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
{
AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
- writeMessageDelivery(entry, channelId, deliver);
+ writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver);
}
- private AMQBody createEncodedDeliverBody(QueueEntry entry,
- final long deliveryTag,
- final AMQShortString consumerTag)
+ private AMQBody createEncodedDeliverBody(AMQMessage message,
+ boolean isRedelivered,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
throws AMQException
{
final AMQShortString exchangeName;
final AMQShortString routingKey;
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
-
- final boolean isRedelivered = entry.isRedelivered();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered);
return returnBlock;
@@ -291,20 +280,10 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter
final AMQShortString exchangeName;
final AMQShortString routingKey;
- if(entry.getMessage() instanceof AMQMessage)
- {
- final AMQMessage message = (AMQMessage) entry.getMessage();
- final MessagePublishInfo pb = message.getMessagePublishInfo();
- exchangeName = pb.getExchange();
- routingKey = pb.getRoutingKey();
- }
- else
- {
- MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
- DeliveryProperties delvProps = message.getHeader().getDeliveryProperties();
- exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
- routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
- }
+ final AMQMessage message = convertToAMQMessage(entry);
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
final boolean isRedelivered = entry.isRedelivered();
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
new file mode 100644
index 0000000000..be9b0323a3
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java
@@ -0,0 +1,245 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.io.EOFException;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
+import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl;
+import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Section;
+import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry;
+import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue;
+import org.apache.qpid.amqp_1_0.type.messaging.Data;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.store.StoreFuture;
+import org.apache.qpid.server.store.StoredMessage;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.transport.codec.BBDecoder;
+import org.apache.qpid.typedmessage.TypedBytesContentReader;
+import org.apache.qpid.typedmessage.TypedBytesFormatException;
+
+public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0>
+{
+ private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance()
+ .registerTransportLayer()
+ .registerMessagingLayer()
+ .registerTransactionLayer()
+ .registerSecurityLayer();
+
+ @Override
+ public final Class<Message_1_0> getOutputClass()
+ {
+ return Message_1_0.class;
+ }
+
+ @Override
+ public final Message_1_0 convert(M message, VirtualHost vhost)
+ {
+
+ SectionEncoder sectionEncoder = new SectionEncoderImpl(_typeRegistry);
+ return new Message_1_0(convertToStoredMessage(message, sectionEncoder));
+ }
+
+
+ private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder)
+ {
+ final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, sectionEncoder);
+ return convertServerMessage(metaData, serverMessage, sectionEncoder);
+ }
+
+ abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage, SectionEncoder sectionEncoder);
+
+
+ private static Section convertMessageBody(String mimeType, byte[] data)
+ {
+ if("text/plain".equals(mimeType) || "text/xml".equals(mimeType))
+ {
+ String text = new String(data);
+ return new AmqpValue(text);
+ }
+ else if("jms/map-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ LinkedHashMap map = new LinkedHashMap();
+ final int entries = reader.readIntImpl();
+ for (int i = 0; i < entries; i++)
+ {
+ try
+ {
+ String propName = reader.readStringImpl();
+ Object value = reader.readObject();
+ map.put(propName, value);
+ }
+ catch (EOFException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new IllegalArgumentException(e);
+ }
+
+ }
+
+ return new AmqpValue(map);
+
+ }
+ else if("amqp/map".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readMap());
+
+ }
+ else if("amqp/list".equals(mimeType))
+ {
+ BBDecoder decoder = new BBDecoder();
+ decoder.init(ByteBuffer.wrap(data));
+ return new AmqpValue(decoder.readList());
+ }
+ else if("jms/stream-message".equals(mimeType))
+ {
+ TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data));
+
+ List list = new ArrayList();
+ while (reader.remaining() != 0)
+ {
+ try
+ {
+ list.add(reader.readObject());
+ }
+ catch (TypedBytesFormatException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ catch (EOFException e)
+ {
+ throw new RuntimeException(e); // TODO - Implement
+ }
+ }
+ return new AmqpValue(list);
+ }
+ else
+ {
+ return new Data(new Binary(data));
+
+ }
+ }
+
+ private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData,
+ final ServerMessage serverMessage,
+ SectionEncoder sectionEncoder)
+ {
+ final String mimeType = serverMessage.getMessageHeader().getMimeType();
+ byte[] data = new byte[(int) serverMessage.getSize()];
+ serverMessage.getContent(ByteBuffer.wrap(data), 0);
+
+ Section bodySection = convertMessageBody(mimeType, data);
+
+ final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder);
+
+ return new StoredMessage<MessageMetaData_1_0>()
+ {
+ @Override
+ public MessageMetaData_1_0 getMetaData()
+ {
+ return metaData;
+ }
+
+ @Override
+ public long getMessageNumber()
+ {
+ return serverMessage.getMessageNumber();
+ }
+
+ @Override
+ public void addContent(int offsetInMessage, ByteBuffer src)
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public int getContent(int offsetInMessage, ByteBuffer dst)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ int size;
+ if(dst.remaining()<buf.remaining())
+ {
+ buf.limit(dst.remaining());
+ size = dst.remaining();
+ }
+ else
+ {
+ size = buf.remaining();
+ }
+ dst.put(buf);
+ return size;
+ }
+
+ @Override
+ public ByteBuffer getContent(int offsetInMessage, int size)
+ {
+ ByteBuffer buf = allData.duplicate();
+ buf.position(offsetInMessage);
+ buf = buf.slice();
+ if(size < buf.remaining())
+ {
+ buf.limit(size);
+ }
+ return buf;
+ }
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void remove()
+ {
+ serverMessage.getStoredMessage().remove();
+ }
+ };
+ }
+
+ private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder)
+ {
+ int headerSize = (int) metaData.getStorableSize();
+
+ sectionEncoder.reset();
+ sectionEncoder.encodeObject(bodySection);
+ Binary dataEncoding = sectionEncoder.getEncoding();
+
+ final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength());
+ metaData.writeToBuffer(0,allData);
+ allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength());
+ return allData;
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index 36eac879b9..7d0e3e3dbb 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -59,6 +59,8 @@ import org.apache.qpid.amqp_1_0.type.transport.Transfer;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.server.plugin.MessageConverter;
+import org.apache.qpid.server.protocol.MessageConverterRegistry;
import org.apache.qpid.typedmessage.TypedBytesContentReader;
import org.apache.qpid.typedmessage.TypedBytesFormatException;
import org.apache.qpid.server.filter.FilterManager;
@@ -168,9 +170,18 @@ class Subscription_1_0 implements Subscription
public boolean hasInterest(final QueueEntry entry)
{
- return !(_noLocal && (entry.getMessage() instanceof Message_1_0)
- && ((Message_1_0)entry.getMessage()).getSession() == getSession())
- && checkFilters(entry);
+ if(entry.getMessage() instanceof Message_1_0)
+ {
+ if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession())
+ {
+ return false;
+ }
+ }
+ else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null)
+ {
+ return false;
+ }
+ return checkFilters(entry);
}
@@ -221,18 +232,8 @@ class Subscription_1_0 implements Subscription
}
else
{
- if(serverMessage instanceof AMQMessage)
- {
- message = new Message_1_0(convert08Message((AMQMessage)serverMessage));
- }
- else if(serverMessage instanceof MessageTransferMessage)
- {
- message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage));
- }
- else
- {
- return;
- }
+ final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class);
+ message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost());
}
Transfer transfer = new Transfer();
diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
new file mode 100644
index 0000000000..64c497d433
--- /dev/null
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter
@@ -0,0 +1,23 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10
+org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8
+org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0
+org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0
+org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10
diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
index 9aa1d4ce11..9aa1d4ce11 100644
--- a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
+++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType