diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 14:26:21 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 14:26:21 +0000 |
| commit | 4ad719f82cadddaef7e6693b43c3e767f3eb14cd (patch) | |
| tree | e8fc825075bbed8e36a58e57d5a8abfaa74ec788 /qpid/java | |
| parent | ad575a78103b300adaa6f616b6ed3b60dedf0f7e (diff) | |
| download | qpid-python-4ad719f82cadddaef7e6693b43c3e767f3eb14cd.tar.gz | |
QPID-4659 : [Java Broker] make message fomat conversions pluggable for different protcol versions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503267 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
15 files changed, 1376 insertions, 246 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java new file mode 100644 index 0000000000..cf3860ba92 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageConverter.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; + +public interface MessageConverter<M extends ServerMessage, N extends ServerMessage> extends Pluggable +{ + Class<M> getInputClass(); + Class<N> getOutputClass(); + + N convert(M message, VirtualHost vhost); +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java new file mode 100644 index 0000000000..81e5af179d --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/MessageConverterRegistry.java @@ -0,0 +1,59 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol; + +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.plugin.QpidServiceLoader; + +public class MessageConverterRegistry +{ + private static Map<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>> _converters = + new HashMap<Class<? extends ServerMessage>, Map<Class<? extends ServerMessage>, MessageConverter>>(); + + static + { + + for(MessageConverter<? extends ServerMessage, ? extends ServerMessage> converter : (new QpidServiceLoader<MessageConverter>()).instancesOf(MessageConverter.class)) + { + Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(converter.getInputClass()); + if(map == null) + { + map = new HashMap<Class<? extends ServerMessage>, MessageConverter>(); + _converters.put(converter.getInputClass(), map); + } + map.put(converter.getOutputClass(),converter); + } + } + + public static <M extends ServerMessage,N extends ServerMessage> MessageConverter<M, N> getConverter(Class<M> from, Class<N> to) + { + Map<Class<? extends ServerMessage>, MessageConverter> map = _converters.get(from); + if(map == null) + { + map = _converters.get(ServerMessage.class); + } + return map == null ? null : map.get(to); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java new file mode 100644 index 0000000000..a70bd4b243 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_10_v1_0/MessageConverter_0_10_to_1_0.java @@ -0,0 +1,140 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.server.protocol.converter.v0_10_v1_0; + +import java.util.ArrayList; +import java.util.List; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; +import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageProperties; + +public class MessageConverter_0_10_to_1_0 extends MessageConverter_to_1_0<MessageTransferMessage> +{ + @Override + public Class<MessageTransferMessage> getInputClass() + { + return MessageTransferMessage.class; + } + + + @Override + protected MessageMetaData_1_0 convertMetaData(MessageTransferMessage serverMessage, + SectionEncoder sectionEncoder) + { + List<Section> sections = new ArrayList<Section>(3); + final MessageProperties msgProps = serverMessage.getHeader().getMessageProperties(); + final DeliveryProperties deliveryProps = serverMessage.getHeader().getDeliveryProperties(); + + Header header = new Header(); + if(deliveryProps != null) + { + header.setDurable(deliveryProps.hasDeliveryMode() && deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT); + if(deliveryProps.hasPriority()) + { + header.setPriority(UnsignedByte.valueOf((byte) deliveryProps.getPriority().getValue())); + } + if(deliveryProps.hasTtl()) + { + header.setTtl(UnsignedInteger.valueOf(deliveryProps.getTtl())); + } + sections.add(header); + } + + Properties props = new Properties(); + + /* + TODO: the current properties are not currently set: + + absoluteExpiryTime + creationTime + groupId + groupSequence + replyToGroupId + to + */ + + if(msgProps != null) + { + if(msgProps.hasContentEncoding()) + { + props.setContentEncoding(Symbol.valueOf(msgProps.getContentEncoding())); + } + + if(msgProps.hasCorrelationId()) + { + props.setCorrelationId(msgProps.getCorrelationId()); + } + + if(msgProps.hasMessageId()) + { + props.setMessageId(msgProps.getMessageId()); + } + if(msgProps.hasReplyTo()) + { + props.setReplyTo(msgProps.getReplyTo().getExchange()+"/"+msgProps.getReplyTo().getRoutingKey()); + } + if(msgProps.hasContentType()) + { + props.setContentType(Symbol.valueOf(msgProps.getContentType())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) + { + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + } + + props.setSubject(serverMessage.getRoutingKey()); + + if(msgProps.hasUserId()) + { + props.setUserId(new Binary(msgProps.getUserId())); + } + + sections.add(props); + + if(msgProps.getApplicationHeaders() != null) + { + sections.add(new ApplicationProperties(msgProps.getApplicationHeaders())); + } + } + return new MessageMetaData_1_0(sections, sectionEncoder); + } + + @Override + public String getType() + { + return "v0-10 to v1-0"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java new file mode 100644 index 0000000000..c14896079f --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_10_to_0_8.java @@ -0,0 +1,271 @@ +package org.apache.qpid.server.protocol.converter.v0_8_v0_10; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.Map; +import org.apache.qpid.AMQPInvalidClassException; +import org.apache.qpid.exchange.ExchangeDefaults; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.framing.abstraction.MessagePublishInfo; +import org.apache.qpid.server.exchange.Exchange; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v0_8.MessageMetaData; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryMode; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; + +public class MessageConverter_0_10_to_0_8 implements MessageConverter<MessageTransferMessage, AMQMessage> +{ + private static final int BASIC_CLASS_ID = 60; + + public static BasicContentHeaderProperties convertContentHeaderProperties(MessageTransferMessage messageTransferMessage, + VirtualHost vhost) + { + BasicContentHeaderProperties props = new BasicContentHeaderProperties(); + + Header header = messageTransferMessage.getHeader(); + DeliveryProperties deliveryProps = header.getDeliveryProperties(); + MessageProperties messageProps = header.getMessageProperties(); + + if(deliveryProps != null) + { + if(deliveryProps.hasDeliveryMode()) + { + props.setDeliveryMode((byte) (deliveryProps.getDeliveryMode() == MessageDeliveryMode.PERSISTENT + ? BasicContentHeaderProperties.PERSISTENT + : BasicContentHeaderProperties.NON_PERSISTENT)); + } + if(deliveryProps.hasExpiration()) + { + props.setExpiration(deliveryProps.getExpiration()); + } + if(deliveryProps.hasPriority()) + { + props.setPriority((byte) deliveryProps.getPriority().getValue()); + } + if(deliveryProps.hasTimestamp()) + { + props.setTimestamp(deliveryProps.getTimestamp()); + } + } + if(messageProps != null) + { + if(messageProps.hasAppId()) + { + props.setAppId(new AMQShortString(messageProps.getAppId())); + } + if(messageProps.hasContentType()) + { + props.setContentType(messageProps.getContentType()); + } + if(messageProps.hasCorrelationId()) + { + props.setCorrelationId(new AMQShortString(messageProps.getCorrelationId())); + } + if(messageProps.hasContentEncoding()) + { + props.setEncoding(messageProps.getContentEncoding()); + } + if(messageProps.hasMessageId()) + { + props.setMessageId("ID:" + messageProps.getMessageId().toString()); + } + if(messageProps.hasReplyTo()) + { + ReplyTo replyTo = messageProps.getReplyTo(); + String exchangeName = replyTo.getExchange(); + String routingKey = replyTo.getRoutingKey(); + if(exchangeName == null) + { + exchangeName = ""; + } + + Exchange exchange = vhost.getExchange(exchangeName); + String exchangeClass = exchange == null + ? ExchangeDefaults.DIRECT_EXCHANGE_CLASS.asString() + : exchange.getType().getName().asString(); + props.setReplyTo(exchangeClass + "://" + exchangeName + "//?routingkey='" + (routingKey == null + ? "" + : routingKey + "'")); + + } + if(messageProps.hasUserId()) + { + props.setUserId(new AMQShortString(messageProps.getUserId())); + } + + if(messageProps.hasApplicationHeaders()) + { + Map<String, Object> appHeaders = new HashMap<String, Object>(messageProps.getApplicationHeaders()); + if(messageProps.getApplicationHeaders().containsKey("x-jms-type")) + { + props.setType(String.valueOf(appHeaders.remove("x-jms-type"))); + } + + FieldTable ft = new FieldTable(); + for(Map.Entry<String, Object> entry : appHeaders.entrySet()) + { + try + { + ft.put(new AMQShortString(entry.getKey()), entry.getValue()); + } + catch (AMQPInvalidClassException e) + { + // TODO + // log here, but ignore - just can;t convert + } + } + props.setHeaders(ft); + + } + } + + return props; + } + + @Override + public Class<MessageTransferMessage> getInputClass() + { + return MessageTransferMessage.class; + } + + @Override + public Class<AMQMessage> getOutputClass() + { + return AMQMessage.class; + } + + @Override + public AMQMessage convert(MessageTransferMessage message, VirtualHost vhost) + { + return new AMQMessage(convertToStoredMessage(message, vhost)); + } + + private StoredMessage<MessageMetaData> convertToStoredMessage(final MessageTransferMessage message, + VirtualHost vhost) + { + final MessageMetaData metaData = convertMetaData(message, vhost); + return new StoredMessage<org.apache.qpid.server.protocol.v0_8.MessageMetaData>() + { + @Override + public MessageMetaData getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return message.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return message.getContent(dst, offsetInMessage); + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return message.getContent(offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData convertMetaData(MessageTransferMessage message, VirtualHost vhost) + { + return new MessageMetaData(convertPublishBody(message), + convertContentHeaderBody(message, vhost), + 1, + message.getArrivalTime()); + } + + private ContentHeaderBody convertContentHeaderBody(MessageTransferMessage message, VirtualHost vhost) + { + BasicContentHeaderProperties props = convertContentHeaderProperties(message, vhost); + ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); + chb.setBodySize(message.getSize()); + return chb; + } + + private MessagePublishInfo convertPublishBody(MessageTransferMessage message) + { + DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); + final AMQShortString exchangeName = (delvProps == null || delvProps.getExchange() == null) + ? null + : new AMQShortString(delvProps.getExchange()); + final AMQShortString routingKey = (delvProps == null || delvProps.getRoutingKey() == null) + ? null + : new AMQShortString(delvProps.getRoutingKey()); + final boolean immediate = delvProps != null && delvProps.getImmediate(); + final boolean mandatory = delvProps != null && !delvProps.getDiscardUnroutable(); + + return new MessagePublishInfo() + { + @Override + public AMQShortString getExchange() + { + return exchangeName; + } + + @Override + public void setExchange(AMQShortString exchange) + { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isImmediate() + { + return immediate; + } + + @Override + public boolean isMandatory() + { + return mandatory; + } + + @Override + public AMQShortString getRoutingKey() + { + return routingKey; + } + }; + } + + @Override + public String getType() + { + return "v0-10 to v0-8"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java new file mode 100644 index 0000000000..e1e8fbd9d3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v0_10/MessageConverter_0_8_to_0_10.java @@ -0,0 +1,226 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.converter.v0_8_v0_10; + +import java.net.URISyntaxException; +import java.nio.ByteBuffer; +import java.util.Map; +import java.util.UUID; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; +import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; +import org.apache.qpid.transport.ReplyTo; +import org.apache.qpid.url.AMQBindingURL; + +public class MessageConverter_0_8_to_0_10 implements MessageConverter<AMQMessage, MessageTransferMessage> +{ + @Override + public Class<AMQMessage> getInputClass() + { + return AMQMessage.class; + } + + @Override + public Class<MessageTransferMessage> getOutputClass() + { + return MessageTransferMessage.class; + } + + @Override + public MessageTransferMessage convert(AMQMessage message_0_8, VirtualHost vhost) + { + return new MessageTransferMessage(convertToStoredMessage(message_0_8), null); + } + + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final AMQMessage message_0_8) + { + final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(message_0_8); + return new StoredMessage<MessageMetaData_0_10>() + { + @Override + public MessageMetaData_0_10 getMetaData() + { + return messageMetaData_0_10; + } + + @Override + public long getMessageNumber() + { + return message_0_8.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return message_0_8.getContent(dst, offsetInMessage); + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return message_0_8.getContent(offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData_0_10 convertMetaData(AMQMessage message_0_8) + { + DeliveryProperties deliveryProps = new DeliveryProperties(); + MessageProperties messageProps = new MessageProperties(); + + int size = (int) message_0_8.getSize(); + ByteBuffer body = ByteBuffer.allocate(size); + message_0_8.getContent(body, 0); + body.flip(); + + BasicContentHeaderProperties properties = + (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); + + final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); + if(exchange != null) + { + deliveryProps.setExchange(exchange.toString()); + } + + deliveryProps.setExpiration(message_0_8.getExpiration()); + deliveryProps.setImmediate(message_0_8.isImmediate()); + deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority())); + deliveryProps.setRoutingKey(message_0_8.getRoutingKey()); + deliveryProps.setTimestamp(properties.getTimestamp()); + + messageProps.setContentEncoding(properties.getEncodingAsString()); + messageProps.setContentLength(size); + if(properties.getAppId() != null) + { + messageProps.setAppId(properties.getAppId().getBytes()); + } + messageProps.setContentType(properties.getContentTypeAsString()); + if(properties.getCorrelationId() != null) + { + messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); + } + + if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + { + String origReplyToString = properties.getReplyTo().asString(); + ReplyTo replyTo = new ReplyTo(); + // if the string looks like a binding URL, then attempt to parse it... + try + { + AMQBindingURL burl = new AMQBindingURL(origReplyToString); + AMQShortString routingKey = burl.getRoutingKey(); + if(routingKey != null) + { + replyTo.setRoutingKey(routingKey.asString()); + } + + AMQShortString exchangeName = burl.getExchangeName(); + if(exchangeName != null) + { + replyTo.setExchange(exchangeName.asString()); + } + } + catch (URISyntaxException e) + { + replyTo.setRoutingKey(origReplyToString); + } + messageProps.setReplyTo(replyTo); + + } + + if(properties.getMessageId() != null) + { + try + { + String messageIdAsString = properties.getMessageIdAsString(); + if(messageIdAsString.startsWith("ID:")) + { + messageIdAsString = messageIdAsString.substring(3); + } + UUID uuid = UUID.fromString(messageIdAsString); + messageProps.setMessageId(uuid); + } + catch(IllegalArgumentException e) + { + // ignore - can't parse + } + } + + + + if(properties.getUserId() != null) + { + messageProps.setUserId(properties.getUserId().getBytes()); + } + + FieldTable fieldTable = properties.getHeaders(); + + Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); + + if(properties.getType() != null) + { + appHeaders.put("x-jms-type", properties.getTypeAsString()); + } + + + messageProps.setApplicationHeaders(appHeaders); + + Header header = new Header(deliveryProps, messageProps, null); + + + return new MessageMetaData_0_10(header, size, message_0_8.getArrivalTime()); + } + + @Override + public String getType() + { + return "v0-8 to v0-10"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java new file mode 100644 index 0000000000..0d9d59ff56 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/converter/v0_8_v1_0/MessageConverter_0_8_to_1_0.java @@ -0,0 +1,125 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.converter.v0_8_v1_0; + +import java.util.ArrayList; +import java.util.List; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.UnsignedByte; +import org.apache.qpid.amqp_1_0.type.UnsignedInteger; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.framing.AMQShortString; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.protocol.v1_0.MessageConverter_to_1_0; +import org.apache.qpid.server.protocol.v1_0.MessageMetaData_1_0; + +public class MessageConverter_0_8_to_1_0 extends MessageConverter_to_1_0<AMQMessage> +{ + @Override + public Class<AMQMessage> getInputClass() + { + return AMQMessage.class; + } + + protected MessageMetaData_1_0 convertMetaData(final AMQMessage serverMessage, SectionEncoder sectionEncoder) + { + + List<Section> sections = new ArrayList<Section>(3); + + Header header = new Header(); + + header.setDurable(serverMessage.isPersistent()); + + BasicContentHeaderProperties contentHeader = + (BasicContentHeaderProperties) serverMessage.getContentHeaderBody().getProperties(); + + header.setPriority(UnsignedByte.valueOf(contentHeader.getPriority())); + final long expiration = serverMessage.getExpiration(); + final long arrivalTime = serverMessage.getArrivalTime(); + + if(expiration > arrivalTime) + { + header.setTtl(UnsignedInteger.valueOf(expiration - arrivalTime)); + } + sections.add(header); + + + Properties props = new Properties(); + + /* + TODO: The following properties are not currently set: + + creationTime + groupId + groupSequence + replyToGroupId + to + */ + + props.setContentEncoding(Symbol.valueOf(contentHeader.getEncodingAsString())); + + props.setContentType(Symbol.valueOf(contentHeader.getContentTypeAsString())); + + // Modify the content type when we are dealing with java object messages produced by the Qpid 0.x client + if(props.getContentType() == Symbol.valueOf("application/java-object-stream")) + { + props.setContentType(Symbol.valueOf("application/x-java-serialized-object")); + } + + final AMQShortString correlationId = contentHeader.getCorrelationId(); + if(correlationId != null) + { + props.setCorrelationId(new Binary(correlationId.getBytes())); + } + + final AMQShortString messageId = contentHeader.getMessageId(); + if(messageId != null) + { + props.setMessageId(new Binary(messageId.getBytes())); + } + props.setReplyTo(String.valueOf(contentHeader.getReplyTo())); + + props.setSubject(serverMessage.getRoutingKey()); + if(contentHeader.getUserId() != null) + { + props.setUserId(new Binary(contentHeader.getUserId().getBytes())); + } + + sections.add(props); + + sections.add(new ApplicationProperties(FieldTable.convertToMap(contentHeader.getHeaders()))); + + return new MessageMetaData_1_0(sections, sectionEncoder); + } + + @Override + public String getType() + { + return "v0-8 to v1-0"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java new file mode 100644 index 0000000000..c6ae0c6e47 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageConverter_v0_10.java @@ -0,0 +1,138 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.DeliveryProperties; +import org.apache.qpid.transport.Header; +import org.apache.qpid.transport.MessageDeliveryPriority; +import org.apache.qpid.transport.MessageProperties; + +public class MessageConverter_v0_10 implements MessageConverter<ServerMessage, MessageTransferMessage> +{ + @Override + public Class<ServerMessage> getInputClass() + { + return ServerMessage.class; + } + + @Override + public Class<MessageTransferMessage> getOutputClass() + { + return MessageTransferMessage.class; + } + + @Override + public MessageTransferMessage convert(ServerMessage serverMsg, VirtualHost vhost) + { + return new MessageTransferMessage(convertToStoredMessage(serverMsg), null); + } + + private StoredMessage<MessageMetaData_0_10> convertToStoredMessage(final ServerMessage serverMsg) + { + final MessageMetaData_0_10 messageMetaData_0_10 = convertMetaData(serverMsg); + + return new StoredMessage<MessageMetaData_0_10>() + { + @Override + public MessageMetaData_0_10 getMetaData() + { + return messageMetaData_0_10; + } + + @Override + public long getMessageNumber() + { + return serverMsg.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + return serverMsg.getContent(dst, offsetInMessage); + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + return serverMsg.getContent(offsetInMessage, size); + } + + @Override + public StoreFuture flushToStore() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + private MessageMetaData_0_10 convertMetaData(ServerMessage serverMsg) + { + DeliveryProperties deliveryProps = new DeliveryProperties(); + MessageProperties messageProps = new MessageProperties(); + + int size = (int) serverMsg.getSize(); + ByteBuffer body = ByteBuffer.allocate(size); + serverMsg.getContent(body, 0); + body.flip(); + + + deliveryProps.setExpiration(serverMsg.getExpiration()); + deliveryProps.setImmediate(serverMsg.isImmediate()); + deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); + deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); + deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + + messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); + messageProps.setContentLength(size); + messageProps.setContentType(serverMsg.getMessageHeader().getMimeType()); + if(serverMsg.getMessageHeader().getCorrelationId() != null) + { + messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); + } + + Header header = new Header(deliveryProps, messageProps, null); + return new MessageMetaData_0_10(header, size, serverMsg.getArrivalTime()); + } + + @Override + public String getType() + { + return "Unknown to v0-10"; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java index 87d482c44c..c6bceb6ac7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/Subscription_0_10.java @@ -33,7 +33,8 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; -import org.apache.qpid.server.protocol.v0_8.AMQMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.queue.AMQQueue; @@ -48,21 +49,16 @@ import org.apache.qpid.transport.Header; import org.apache.qpid.transport.MessageAcceptMode; import org.apache.qpid.transport.MessageAcquireMode; import org.apache.qpid.transport.MessageCreditUnit; -import org.apache.qpid.transport.MessageDeliveryPriority; import org.apache.qpid.transport.MessageFlowMode; import org.apache.qpid.transport.MessageProperties; import org.apache.qpid.transport.MessageTransfer; import org.apache.qpid.transport.Method; import org.apache.qpid.transport.Option; -import org.apache.qpid.transport.ReplyTo; import org.apache.qpid.transport.Struct; -import org.apache.qpid.url.AMQBindingURL; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPTION_FORMAT; -import java.net.URISyntaxException; -import java.nio.ByteBuffer; import java.text.MessageFormat; import java.util.Arrays; import java.util.Collections; @@ -209,10 +205,21 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr return false; } - if (_noLocal && entry.getMessage() instanceof MessageTransferMessage) + if (entry.getMessage() instanceof MessageTransferMessage) { - Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); - if (connectionRef != null && connectionRef == _session.getReference()) + if(_noLocal) + { + Object connectionRef = ((MessageTransferMessage)entry.getMessage()).getConnectionReference(); + if (connectionRef != null && connectionRef == _session.getReference()) + { + return false; + } + } + } + else + { + // no interest in messages we can't convert + if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), MessageTransferMessage.class)==null) { return false; } @@ -348,200 +355,72 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr DeliveryProperties deliveryProps; MessageProperties messageProps = null; + MessageTransferMessage msg; + if(serverMsg instanceof MessageTransferMessage) { - MessageTransferMessage msg = (MessageTransferMessage) serverMsg; - DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); - messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - - deliveryProps = new DeliveryProperties(); - if(origDeliveryProps != null) - { - if(origDeliveryProps.hasDeliveryMode()) - { - deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); - } - if(origDeliveryProps.hasExchange()) - { - deliveryProps.setExchange(origDeliveryProps.getExchange()); - } - if(origDeliveryProps.hasExpiration()) - { - deliveryProps.setExpiration(origDeliveryProps.getExpiration()); - } - if(origDeliveryProps.hasPriority()) - { - deliveryProps.setPriority(origDeliveryProps.getPriority()); - } - if(origDeliveryProps.hasRoutingKey()) - { - deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); - } - if(origDeliveryProps.hasTimestamp()) - { - deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); - } - if(origDeliveryProps.hasTtl()) - { - deliveryProps.setTtl(origDeliveryProps.getTtl()); - } - + msg = (MessageTransferMessage) serverMsg; - } - - deliveryProps.setRedelivered(entry.isRedelivered()); - - if(_trace != null && messageProps == null) - { - messageProps = new MessageProperties(); - } - - Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - - - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); } - else if(serverMsg instanceof AMQMessage) + else { - AMQMessage message_0_8 = (AMQMessage) serverMsg; - deliveryProps = new DeliveryProperties(); - messageProps = new MessageProperties(); + MessageConverter converter = + MessageConverterRegistry.getConverter(serverMsg.getClass(), MessageTransferMessage.class); + - int size = (int) message_0_8.getSize(); - ByteBuffer body = ByteBuffer.allocate(size); - message_0_8.getContent(body, 0); - body.flip(); + msg = (MessageTransferMessage) converter.convert(serverMsg, getQueue().getVirtualHost()); + } + DeliveryProperties origDeliveryProps = msg.getHeader() == null ? null : msg.getHeader().getDeliveryProperties(); + messageProps = msg.getHeader() == null ? null : msg.getHeader().getMessageProperties(); - BasicContentHeaderProperties properties = - (BasicContentHeaderProperties) message_0_8.getContentHeaderBody().getProperties(); - final AMQShortString exchange = message_0_8.getMessagePublishInfo().getExchange(); - if(exchange != null) + deliveryProps = new DeliveryProperties(); + if(origDeliveryProps != null) + { + if(origDeliveryProps.hasDeliveryMode()) { - deliveryProps.setExchange(exchange.toString()); + deliveryProps.setDeliveryMode(origDeliveryProps.getDeliveryMode()); } - deliveryProps.setExpiration(message_0_8.getExpiration()); - deliveryProps.setImmediate(message_0_8.isImmediate()); - deliveryProps.setPriority(MessageDeliveryPriority.get(properties.getPriority())); - deliveryProps.setRedelivered(entry.isRedelivered()); - deliveryProps.setRoutingKey(message_0_8.getRoutingKey()); - deliveryProps.setTimestamp(properties.getTimestamp()); - - messageProps.setContentEncoding(properties.getEncodingAsString()); - messageProps.setContentLength(size); - if(properties.getAppId() != null) + if(origDeliveryProps.hasExchange()) { - messageProps.setAppId(properties.getAppId().getBytes()); + deliveryProps.setExchange(origDeliveryProps.getExchange()); } - messageProps.setContentType(properties.getContentTypeAsString()); - if(properties.getCorrelationId() != null) + if(origDeliveryProps.hasExpiration()) { - messageProps.setCorrelationId(properties.getCorrelationId().getBytes()); + deliveryProps.setExpiration(origDeliveryProps.getExpiration()); } - - if(properties.getReplyTo() != null && properties.getReplyTo().length() != 0) + if(origDeliveryProps.hasPriority()) { - String origReplyToString = properties.getReplyTo().asString(); - ReplyTo replyTo = new ReplyTo(); - // if the string looks like a binding URL, then attempt to parse it... - try - { - AMQBindingURL burl = new AMQBindingURL(origReplyToString); - AMQShortString routingKey = burl.getRoutingKey(); - if(routingKey != null) - { - replyTo.setRoutingKey(routingKey.asString()); - } - - AMQShortString exchangeName = burl.getExchangeName(); - if(exchangeName != null) - { - replyTo.setExchange(exchangeName.asString()); - } - } - catch (URISyntaxException e) - { - replyTo.setRoutingKey(origReplyToString); - } - messageProps.setReplyTo(replyTo); - + deliveryProps.setPriority(origDeliveryProps.getPriority()); } - - if(properties.getMessageId() != null) + if(origDeliveryProps.hasRoutingKey()) { - try - { - String messageIdAsString = properties.getMessageIdAsString(); - if(messageIdAsString.startsWith("ID:")) - { - messageIdAsString = messageIdAsString.substring(3); - } - UUID uuid = UUID.fromString(messageIdAsString); - messageProps.setMessageId(uuid); - } - catch(IllegalArgumentException e) - { - // ignore - can't parse - } + deliveryProps.setRoutingKey(origDeliveryProps.getRoutingKey()); } - - - - if(properties.getUserId() != null) + if(origDeliveryProps.hasTimestamp()) { - messageProps.setUserId(properties.getUserId().getBytes()); + deliveryProps.setTimestamp(origDeliveryProps.getTimestamp()); } - - FieldTable fieldTable = properties.getHeaders(); - - Map<String, Object> appHeaders = FieldTable.convertToMap(fieldTable); - - if(properties.getType() != null) + if(origDeliveryProps.hasTtl()) { - appHeaders.put("x-jms-type", properties.getTypeAsString()); + deliveryProps.setTtl(origDeliveryProps.getTtl()); } - messageProps.setApplicationHeaders(appHeaders); - - Header header = new Header(deliveryProps, messageProps, null); - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); } - else - { - - deliveryProps = new DeliveryProperties(); - messageProps = new MessageProperties(); - - int size = (int) serverMsg.getSize(); - ByteBuffer body = ByteBuffer.allocate(size); - serverMsg.getContent(body, 0); - body.flip(); + deliveryProps.setRedelivered(entry.isRedelivered()); - deliveryProps.setExpiration(serverMsg.getExpiration()); - deliveryProps.setImmediate(serverMsg.isImmediate()); - deliveryProps.setPriority(MessageDeliveryPriority.get(serverMsg.getMessageHeader().getPriority())); - deliveryProps.setRedelivered(entry.isRedelivered()); - deliveryProps.setRoutingKey(serverMsg.getRoutingKey()); - deliveryProps.setTimestamp(serverMsg.getMessageHeader().getTimestamp()); + if(_trace != null && messageProps == null) + { + messageProps = new MessageProperties(); + } - messageProps.setContentEncoding(serverMsg.getMessageHeader().getEncoding()); - messageProps.setContentLength(size); - messageProps.setContentType(serverMsg.getMessageHeader().getMimeType()); - if(serverMsg.getMessageHeader().getCorrelationId() != null) - { - messageProps.setCorrelationId(serverMsg.getMessageHeader().getCorrelationId().getBytes()); - } + Header header = new Header(deliveryProps, messageProps, msg.getHeader() == null ? null : msg.getHeader().getNonStandardProperties()); - // TODO - ReplyTo - Header header = new Header(deliveryProps, messageProps, null); - xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body, BATCHED) - : new MessageTransfer(_destination,_acceptMode,_acquireMode,header, body); - } + xfr = batch ? new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody(), BATCHED) + : new MessageTransfer(_destination,_acceptMode,_acquireMode,header,msg.getBody()); boolean excludeDueToFederation = false; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 3567a005d2..416a4da183 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -152,7 +152,7 @@ public class AMQMessage extends AbstractServerMessageImpl<MessageMetaData> } - public MessagePublishInfo getMessagePublishInfo() throws AMQException + public MessagePublishInfo getMessagePublishInfo() { return getMessageMetaData().getMessagePublishInfo(); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java index 23fc335dc7..ae8e019bd0 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/SubscriptionImpl.java @@ -35,6 +35,7 @@ import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.SubscriptionActor; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.subjects.SubscriptionLogSubject; +import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.output.ProtocolOutputConverter; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.protocol.AMQSessionModel; @@ -482,16 +483,27 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage } } - if (_noLocal) + if(entry.getMessage() instanceof AMQMessage) { - AMQMessage message = (AMQMessage) entry.getMessage(); + if (_noLocal) + { + AMQMessage message = (AMQMessage) entry.getMessage(); - final Object publisherReference = message.getConnectionIdentifier(); + final Object publisherReference = message.getConnectionIdentifier(); - // We don't want local messages so check to see if message is one we sent - Object localReference = getProtocolSession().getReference(); + // We don't want local messages so check to see if message is one we sent + Object localReference = getProtocolSession().getReference(); - if(publisherReference != null && publisherReference.equals(localReference)) + if(publisherReference != null && publisherReference.equals(localReference)) + { + return false; + } + } + } + else + { + // No interest in messages we can't convert to AMQMessage + if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), AMQMessage.class)==null) { return false; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java index 9e7d04e0fb..adba8e665a 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/output/ProtocolOutputConverterImpl.java @@ -27,19 +27,19 @@ import org.apache.qpid.framing.AMQFrame; import org.apache.qpid.framing.AMQMethodBody; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicCancelOkBody; -import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.BasicGetOkBody; import org.apache.qpid.framing.BasicReturnBody; import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.framing.MethodRegistry; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.protocol.AMQVersionAwareProtocolSession; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.server.protocol.v0_8.AMQMessage; import org.apache.qpid.server.message.MessageContentSource; -import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.protocol.AMQProtocolSession; import org.apache.qpid.server.queue.QueueEntry; -import org.apache.qpid.transport.DeliveryProperties; import java.io.DataOutput; import java.io.IOException; @@ -67,33 +67,34 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { - AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag); - writeMessageDelivery(entry, channelId, deliverBody); + AMQMessage msg = convertToAMQMessage(entry); + AMQBody deliverBody = createEncodedDeliverBody(msg, entry.isRedelivered(), deliveryTag, consumerTag); + writeMessageDelivery(msg, channelId, deliverBody); } - - private ContentHeaderBody getContentHeaderBody(QueueEntry entry) - throws AMQException + private AMQMessage convertToAMQMessage(QueueEntry entry) { - if(entry.getMessage() instanceof AMQMessage) + ServerMessage serverMessage = entry.getMessage(); + if(serverMessage instanceof AMQMessage) { - return ((AMQMessage)entry.getMessage()).getContentHeaderBody(); + return (AMQMessage) serverMessage; } else { - final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message, entry.getQueue().getVirtualHost()); - ContentHeaderBody chb = new ContentHeaderBody(props, BASIC_CLASS_ID); - chb.setBodySize(message.getSize()); - return chb; + return getMessageConverter(serverMessage).convert(serverMessage, entry.getQueue().getVirtualHost()); } } + private <M extends ServerMessage> MessageConverter<M, AMQMessage> getMessageConverter(M message) + { + Class<M> clazz = (Class<M>) message.getClass(); + return MessageConverterRegistry.getConverter(clazz, AMQMessage.class); + } - private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody) + private void writeMessageDelivery(AMQMessage message, int channelId, AMQBody deliverBody) throws AMQException { - writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody); + writeMessageDelivery(message, message.getContentHeaderBody(), channelId, deliverBody); } private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody) @@ -188,35 +189,23 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException { AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize); - writeMessageDelivery(entry, channelId, deliver); + writeMessageDelivery(convertToAMQMessage(entry), channelId, deliver); } - private AMQBody createEncodedDeliverBody(QueueEntry entry, - final long deliveryTag, - final AMQShortString consumerTag) + private AMQBody createEncodedDeliverBody(AMQMessage message, + boolean isRedelivered, + final long deliveryTag, + final AMQShortString consumerTag) throws AMQException { final AMQShortString exchangeName; final AMQShortString routingKey; - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } - - final boolean isRedelivered = entry.isRedelivered(); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); final AMQBody returnBlock = new EncodedDeliveryBody(deliveryTag, routingKey, exchangeName, consumerTag, isRedelivered); return returnBlock; @@ -291,20 +280,10 @@ class ProtocolOutputConverterImpl implements ProtocolOutputConverter final AMQShortString exchangeName; final AMQShortString routingKey; - if(entry.getMessage() instanceof AMQMessage) - { - final AMQMessage message = (AMQMessage) entry.getMessage(); - final MessagePublishInfo pb = message.getMessagePublishInfo(); - exchangeName = pb.getExchange(); - routingKey = pb.getRoutingKey(); - } - else - { - MessageTransferMessage message = (MessageTransferMessage) entry.getMessage(); - DeliveryProperties delvProps = message.getHeader().getDeliveryProperties(); - exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange()); - routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey()); - } + final AMQMessage message = convertToAMQMessage(entry); + final MessagePublishInfo pb = message.getMessagePublishInfo(); + exchangeName = pb.getExchange(); + routingKey = pb.getRoutingKey(); final boolean isRedelivered = entry.isRedelivered(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java new file mode 100644 index 0000000000..be9b0323a3 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java @@ -0,0 +1,245 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import java.io.EOFException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.amqp_1_0.type.Binary; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.store.StoreFuture; +import org.apache.qpid.server.store.StoredMessage; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.transport.codec.BBDecoder; +import org.apache.qpid.typedmessage.TypedBytesContentReader; +import org.apache.qpid.typedmessage.TypedBytesFormatException; + +public abstract class MessageConverter_to_1_0<M extends ServerMessage> implements MessageConverter<M, Message_1_0> +{ + private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance() + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer(); + + @Override + public final Class<Message_1_0> getOutputClass() + { + return Message_1_0.class; + } + + @Override + public final Message_1_0 convert(M message, VirtualHost vhost) + { + + SectionEncoder sectionEncoder = new SectionEncoderImpl(_typeRegistry); + return new Message_1_0(convertToStoredMessage(message, sectionEncoder)); + } + + + private StoredMessage<MessageMetaData_1_0> convertToStoredMessage(final M serverMessage, SectionEncoder sectionEncoder) + { + final MessageMetaData_1_0 metaData = convertMetaData(serverMessage, sectionEncoder); + return convertServerMessage(metaData, serverMessage, sectionEncoder); + } + + abstract protected MessageMetaData_1_0 convertMetaData(final M serverMessage, SectionEncoder sectionEncoder); + + + private static Section convertMessageBody(String mimeType, byte[] data) + { + if("text/plain".equals(mimeType) || "text/xml".equals(mimeType)) + { + String text = new String(data); + return new AmqpValue(text); + } + else if("jms/map-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + LinkedHashMap map = new LinkedHashMap(); + final int entries = reader.readIntImpl(); + for (int i = 0; i < entries; i++) + { + try + { + String propName = reader.readStringImpl(); + Object value = reader.readObject(); + map.put(propName, value); + } + catch (EOFException e) + { + throw new IllegalArgumentException(e); + } + catch (TypedBytesFormatException e) + { + throw new IllegalArgumentException(e); + } + + } + + return new AmqpValue(map); + + } + else if("amqp/map".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readMap()); + + } + else if("amqp/list".equals(mimeType)) + { + BBDecoder decoder = new BBDecoder(); + decoder.init(ByteBuffer.wrap(data)); + return new AmqpValue(decoder.readList()); + } + else if("jms/stream-message".equals(mimeType)) + { + TypedBytesContentReader reader = new TypedBytesContentReader(ByteBuffer.wrap(data)); + + List list = new ArrayList(); + while (reader.remaining() != 0) + { + try + { + list.add(reader.readObject()); + } + catch (TypedBytesFormatException e) + { + throw new RuntimeException(e); // TODO - Implement + } + catch (EOFException e) + { + throw new RuntimeException(e); // TODO - Implement + } + } + return new AmqpValue(list); + } + else + { + return new Data(new Binary(data)); + + } + } + + private StoredMessage<MessageMetaData_1_0> convertServerMessage(final MessageMetaData_1_0 metaData, + final ServerMessage serverMessage, + SectionEncoder sectionEncoder) + { + final String mimeType = serverMessage.getMessageHeader().getMimeType(); + byte[] data = new byte[(int) serverMessage.getSize()]; + serverMessage.getContent(ByteBuffer.wrap(data), 0); + + Section bodySection = convertMessageBody(mimeType, data); + + final ByteBuffer allData = encodeConvertedMessage(metaData, bodySection, sectionEncoder); + + return new StoredMessage<MessageMetaData_1_0>() + { + @Override + public MessageMetaData_1_0 getMetaData() + { + return metaData; + } + + @Override + public long getMessageNumber() + { + return serverMessage.getMessageNumber(); + } + + @Override + public void addContent(int offsetInMessage, ByteBuffer src) + { + throw new UnsupportedOperationException(); + } + + @Override + public int getContent(int offsetInMessage, ByteBuffer dst) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + int size; + if(dst.remaining()<buf.remaining()) + { + buf.limit(dst.remaining()); + size = dst.remaining(); + } + else + { + size = buf.remaining(); + } + dst.put(buf); + return size; + } + + @Override + public ByteBuffer getContent(int offsetInMessage, int size) + { + ByteBuffer buf = allData.duplicate(); + buf.position(offsetInMessage); + buf = buf.slice(); + if(size < buf.remaining()) + { + buf.limit(size); + } + return buf; + } + + @Override + public StoreFuture flushToStore() + { + throw new UnsupportedOperationException(); + } + + @Override + public void remove() + { + serverMessage.getStoredMessage().remove(); + } + }; + } + + private ByteBuffer encodeConvertedMessage(MessageMetaData_1_0 metaData, Section bodySection, SectionEncoder sectionEncoder) + { + int headerSize = (int) metaData.getStorableSize(); + + sectionEncoder.reset(); + sectionEncoder.encodeObject(bodySection); + Binary dataEncoding = sectionEncoder.getEncoding(); + + final ByteBuffer allData = ByteBuffer.allocate(headerSize + dataEncoding.getLength()); + metaData.writeToBuffer(0,allData); + allData.put(dataEncoding.getArray(),dataEncoding.getArrayOffset(),dataEncoding.getLength()); + return allData; + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index 36eac879b9..7d0e3e3dbb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -59,6 +59,8 @@ import org.apache.qpid.amqp_1_0.type.transport.Transfer; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.BasicContentHeaderProperties; import org.apache.qpid.framing.FieldTable; +import org.apache.qpid.server.plugin.MessageConverter; +import org.apache.qpid.server.protocol.MessageConverterRegistry; import org.apache.qpid.typedmessage.TypedBytesContentReader; import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; @@ -168,9 +170,18 @@ class Subscription_1_0 implements Subscription public boolean hasInterest(final QueueEntry entry) { - return !(_noLocal && (entry.getMessage() instanceof Message_1_0) - && ((Message_1_0)entry.getMessage()).getSession() == getSession()) - && checkFilters(entry); + if(entry.getMessage() instanceof Message_1_0) + { + if(_noLocal && ((Message_1_0)entry.getMessage()).getSession() == getSession()) + { + return false; + } + } + else if(MessageConverterRegistry.getConverter(entry.getMessage().getClass(), Message_1_0.class)==null) + { + return false; + } + return checkFilters(entry); } @@ -221,18 +232,8 @@ class Subscription_1_0 implements Subscription } else { - if(serverMessage instanceof AMQMessage) - { - message = new Message_1_0(convert08Message((AMQMessage)serverMessage)); - } - else if(serverMessage instanceof MessageTransferMessage) - { - message = new Message_1_0(convert010Message((MessageTransferMessage)serverMessage)); - } - else - { - return; - } + final MessageConverter converter = MessageConverterRegistry.getConverter(serverMessage.getClass(), Message_1_0.class); + message = (Message_1_0) converter.convert(serverMessage, queueEntry.getQueue().getVirtualHost()); } Transfer transfer = new Transfer(); diff --git a/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter new file mode 100644 index 0000000000..64c497d433 --- /dev/null +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageConverter @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_8_to_0_10 +org.apache.qpid.server.protocol.converter.v0_8_v0_10.MessageConverter_0_10_to_0_8 +org.apache.qpid.server.protocol.converter.v0_8_v1_0.MessageConverter_0_8_to_1_0 +org.apache.qpid.server.protocol.converter.v0_10_v1_0.MessageConverter_0_10_to_1_0 +org.apache.qpid.server.protocol.v0_10.MessageConverter_v0_10 diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType index 9aa1d4ce11..9aa1d4ce11 100644 --- a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType +++ b/qpid/java/broker/src/main/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType |
