From c9de50719b7d7566cc4f0f2cae39bbcf824420de Mon Sep 17 00:00:00 2001 From: Robert Godfrey Date: Mon, 15 Jul 2013 11:59:18 +0000 Subject: QPID-4659 : [Java Broker] make message meta data pluggable for different protcol versions git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503192 13f79535-47bb-0310-9956-ffa450edef68 --- .../berkeleydb/tuple/MessageMetaDataBinding.java | 9 +- .../store/berkeleydb/BDBMessageStoreTest.java | 8 +- .../qpid/server/message/MessageMetaData_1_0.java | 566 -------------------- .../qpid/server/plugin/MessageMetaDataType.java | 42 ++ .../protocol/v0_10/MessageMetaDataType_0_10.java | 67 +++ .../protocol/v0_10/MessageMetaData_0_10.java | 8 +- .../qpid/server/protocol/v0_8/MessageMetaData.java | 5 +- .../protocol/v0_8/MessageMetaDataType_0_8.java | 67 +++ .../protocol/v1_0/MessageMetaDataType_1_0.java | 67 +++ .../server/protocol/v1_0/MessageMetaData_1_0.java | 569 +++++++++++++++++++++ .../qpid/server/protocol/v1_0/Message_1_0.java | 3 +- .../server/protocol/v1_0/ReceivingLink_1_0.java | 1 - .../server/protocol/v1_0/Subscription_1_0.java | 1 - .../server/store/AbstractJDBCMessageStore.java | 9 +- .../qpid/server/store/MessageMetaDataType.java | 44 -- .../server/store/MessageMetaDataTypeRegistry.java | 65 +++ .../qpid/server/store/StorableMessageMetaData.java | 2 + .../VirtualHostConfigRecoveryHandler.java | 16 +- ...g.apache.qpid.server.plugin.MessageMetaDataType | 21 + 19 files changed, 924 insertions(+), 646 deletions(-) delete mode 100755 qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java create mode 100755 qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java delete mode 100755 qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java create mode 100644 qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java create mode 100644 qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType (limited to 'qpid/java') diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java index 2e6c8d5666..6925c9ee2b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuple/MessageMetaDataBinding.java @@ -26,7 +26,8 @@ import com.sleepycat.bind.tuple.TupleBinding; import com.sleepycat.bind.tuple.TupleInput; import com.sleepycat.bind.tuple.TupleOutput; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.store.MessageMetaDataTypeRegistry; import org.apache.qpid.server.store.StorableMessageMetaData; /** @@ -54,10 +55,8 @@ public class MessageMetaDataBinding extends TupleBinding _encodedSections = new ArrayList(3); - - private volatile ByteBuffer _encoded; - private MessageHeader_1_0 _messageHeader; - - - public MessageMetaData_1_0(List
sections, SectionEncoder encoder) - { - this(sections, encodeSections(sections, encoder)); - } - - private static ArrayList encodeSections(final List
sections, final SectionEncoder encoder) - { - ArrayList encodedSections = new ArrayList(sections.size()); - for(Section section : sections) - { - encoder.encodeObject(section); - encodedSections.add(encoder.getEncoding().asByteBuffer()); - encoder.reset(); - } - return encodedSections; - } - - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) - { - this(fragments, decoder, new ArrayList(3)); - } - - public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List immuatableSections) - { - this(constructSections(fragments, decoder,immuatableSections), immuatableSections); - } - - private MessageMetaData_1_0(List
sections, List encodedSections) - { - _encodedSections = encodedSections; - - Iterator
sectIter = sections.iterator(); - - Section section = sectIter.hasNext() ? sectIter.next() : null; - if(section instanceof Header) - { - _header = (Header) section; - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof DeliveryAnnotations) - { - _deliveryAnnotations = ((DeliveryAnnotations) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof MessageAnnotations) - { - _messageAnnotations = ((MessageAnnotations) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof Properties) - { - _properties = (Properties) section; - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof ApplicationProperties) - { - _appProperties = ((ApplicationProperties) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - if(section instanceof Footer) - { - _footer = ((Footer) section).getValue(); - section = sectIter.hasNext() ? sectIter.next() : null; - } - - _messageHeader = new MessageHeader_1_0(); - - } - - private static List
constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List encodedSections) - { - List
sections = new ArrayList
(3); - - ByteBuffer src; - if(fragments.length == 1) - { - src = fragments[0].duplicate(); - } - else - { - int size = 0; - for(ByteBuffer buf : fragments) - { - size += buf.remaining(); - } - src = ByteBuffer.allocate(size); - for(ByteBuffer buf : fragments) - { - src.put(buf.duplicate()); - } - src.flip(); - - } - - try - { - int startBarePos = -1; - int lastPos = src.position(); - Section s = decoder.readSection(src); - - - - if(s instanceof Header) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof DeliveryAnnotations) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof MessageAnnotations) - { - sections.add(s); - lastPos = src.position(); - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof Properties) - { - sections.add(s); - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof ApplicationProperties) - { - sections.add(s); - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - - if(s instanceof AmqpValue) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - else if(s instanceof Data) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - do - { - s = src.hasRemaining() ? decoder.readSection(src) : null; - } while(s instanceof Data); - } - else if(s instanceof AmqpSequence) - { - if(startBarePos == -1) - { - startBarePos = lastPos; - } - do - { - s = src.hasRemaining() ? decoder.readSection(src) : null; - } - while(s instanceof AmqpSequence); - } - - if(s instanceof Footer) - { - sections.add(s); - } - - - int pos = 0; - for(ByteBuffer buf : fragments) - { -/* - if(pos < startBarePos) - { - if(pos + buf.remaining() > startBarePos) - { - ByteBuffer dup = buf.duplicate(); - dup.position(dup.position()+startBarePos-pos); - dup.slice(); - encodedSections.add(dup); - } - } - else -*/ - { - encodedSections.add(buf.duplicate()); - } - pos += buf.remaining(); - } - - return sections; - } - catch (AmqpErrorException e) - { - e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. - throw new IllegalArgumentException(e); - } - } - - - public MessageMetaDataType getType() - { - return MessageMetaDataType.META_DATA_1_0; - } - - - public int getStorableSize() - { - int size = 0; - - for(ByteBuffer bin : _encodedSections) - { - size += bin.limit(); - } - - return size; - } - - private ByteBuffer encodeAsBuffer() - { - ByteBuffer buf = ByteBuffer.allocate(getStorableSize()); - - for(ByteBuffer bin : _encodedSections) - { - buf.put(bin.duplicate()); - } - - return buf; - } - - public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - - buf = buf.duplicate(); - - buf.position(offsetInMetaData); - - if(dest.remaining() < buf.limit()) - { - buf.limit(dest.remaining()); - } - dest.put(buf); - return buf.limit(); - } - - public int getContentSize() - { - ByteBuffer buf = _encoded; - - if(buf == null) - { - buf = encodeAsBuffer(); - _encoded = buf; - } - return buf.remaining(); - } - - public boolean isPersistent() - { - return _header != null && Boolean.TRUE.equals(_header.getDurable()); - } - - public MessageHeader_1_0 getMessageHeader() - { - return _messageHeader; - } - - public static final MessageMetaDataType.Factory FACTORY = new MetaDataFactory(); - - - private static class MetaDataFactory implements MessageMetaDataType.Factory - { - private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); - - private MetaDataFactory() - { - _typeRegistry.registerTransportLayer(); - _typeRegistry.registerMessagingLayer(); - _typeRegistry.registerTransactionLayer(); - _typeRegistry.registerSecurityLayer(); - } - - public MessageMetaData_1_0 createMetaData(ByteBuffer buf) - { - ValueHandler valueHandler = new ValueHandler(_typeRegistry); - - ArrayList
sections = new ArrayList
(3); - ArrayList encodedSections = new ArrayList(3); - - while(buf.hasRemaining()) - { - try - { - ByteBuffer encodedBuf = buf.duplicate(); - Object parse = valueHandler.parse(buf); - sections.add((Section) parse); - encodedBuf.limit(buf.position()); - encodedSections.add(encodedBuf); - - } - catch (AmqpErrorException e) - { - //TODO - throw new RuntimeException(e); - } - - } - - return new MessageMetaData_1_0(sections,encodedSections); - - } - } - - public class MessageHeader_1_0 implements AMQMessageHeader - { - - public String getCorrelationId() - { - if(_properties == null || _properties.getCorrelationId() == null) - { - return null; - } - else - { - return _properties.getMessageId().toString(); - } - } - - public long getExpiration() - { - return 0; //TODO - } - - public String getMessageId() - { - if(_properties == null || _properties.getCorrelationId() == null) - { - return null; - } - else - { - return _properties.getCorrelationId().toString(); - } - } - - public String getMimeType() - { - - if(_properties == null || _properties.getContentType() == null) - { - return null; - } - else - { - return _properties.getContentType().toString(); - } - } - - public String getEncoding() - { - return null; //TODO - } - - public byte getPriority() - { - if(_header == null || _header.getPriority() == null) - { - return 4; //javax.jms.Message.DEFAULT_PRIORITY; - } - else - { - return _header.getPriority().byteValue(); - } - } - - public long getTimestamp() - { - if(_properties == null || _properties.getCreationTime() == null) - { - return 0L; - } - else - { - return _properties.getCreationTime().getTime(); - } - - } - - public String getType() - { - - if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null) - { - return null; - } - else - { - return _messageAnnotations.get(JMS_TYPE).toString(); - } - } - - public String getReplyTo() - { - if(_properties == null || _properties.getReplyTo() == null) - { - return null; - } - else - { - return _properties.getReplyTo().toString(); - } - } - - public String getReplyToExchange() - { - return null; //TODO - } - - public String getReplyToRoutingKey() - { - return null; //TODO - } - - public String getAppId() - { - //TODO - return null; - } - - public String getUserId() - { - // TODO - return null; - } - - public Object getHeader(final String name) - { - return _appProperties == null ? null : _appProperties.get(name); - } - - public boolean containsHeaders(final Set names) - { - if(_appProperties == null) - { - return false; - } - - for(String key : names) - { - if(!_appProperties.containsKey(key)) - { - return false; - } - } - return true; - } - - @Override - public Collection getHeaderNames() - { - if(_appProperties == null) - { - return Collections.emptySet(); - } - return Collections.unmodifiableCollection(_appProperties.keySet()); - } - - public boolean containsHeader(final String name) - { - return _appProperties != null && _appProperties.containsKey(name); - } - - public String getSubject() - { - return _properties == null ? null : _properties.getSubject(); - } - } - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java new file mode 100644 index 0000000000..ee89782307 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java @@ -0,0 +1,42 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.plugin; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; + +public interface MessageMetaDataType extends Pluggable +{ + + public static interface Factory + { + M createMetaData(ByteBuffer buf); + } + + public int ordinal(); + + public M createMetaData(ByteBuffer buf); + + public ServerMessage createMessage(StoredMessage msg); + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java new file mode 100644 index 0000000000..90fb443f5b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_0_10 implements MessageMetaDataType +{ + + public static final int TYPE = 1; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData_0_10 createMetaData(ByteBuffer buf) + { + return MessageMetaData_0_10.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage createMessage(StoredMessage msg) + { + return new MessageTransferMessage(msg, null); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v0_10.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2d17351dad..ee2a40a5b2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -51,6 +51,8 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public static final MessageMetaDataType.Factory FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10(); + private volatile ByteBuffer _encoded; private Object _connectionReference; @@ -60,7 +62,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis()); } - private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) + public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) { _header = header; if(_header != null) @@ -83,7 +85,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public MessageMetaDataType getType() { - return MessageMetaDataType.META_DATA_0_10; + return TYPE; } public int getStorableSize() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index c539f793fe..4cc590d8cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.util.ByteBufferOutputStream; import org.apache.qpid.util.ByteBufferInputStream; @@ -56,6 +56,7 @@ public class MessageMetaData implements StorableMessageMetaData private static final byte MANDATORY_FLAG = 1; private static final byte IMMEDIATE_FLAG = 2; public static final MessageMetaDataType.Factory FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8(); public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) { @@ -112,7 +113,7 @@ public class MessageMetaData implements StorableMessageMetaData public MessageMetaDataType getType() { - return MessageMetaDataType.META_DATA_0_8; + return TYPE; } public int getStorableSize() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java new file mode 100644 index 0000000000..9b50127ec7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_0_8 implements MessageMetaDataType +{ + + public static final int TYPE = 0; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData createMetaData(ByteBuffer buf) + { + return MessageMetaData.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage createMessage(StoredMessage msg) + { + return new AMQMessage(msg); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v0_8.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java new file mode 100644 index 0000000000..44b1de74e1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_1_0 implements MessageMetaDataType +{ + + public static final int TYPE = 2; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) + { + return MessageMetaData_1_0.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage createMessage(StoredMessage msg) + { + return new Message_1_0(msg); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v1_0_0.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java new file mode 100755 index 0000000000..8d48d70d9a --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -0,0 +1,569 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.server.protocol.v1_0; + +import java.nio.ByteBuffer; +import java.util.*; +import org.apache.qpid.amqp_1_0.codec.ValueHandler; +import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; +import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; +import org.apache.qpid.amqp_1_0.type.AmqpErrorException; +import org.apache.qpid.amqp_1_0.type.Section; +import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; +import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; +import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Footer; +import org.apache.qpid.amqp_1_0.type.messaging.Header; +import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.store.StorableMessageMetaData; + +public class MessageMetaData_1_0 implements StorableMessageMetaData +{ + // TODO move to somewhere more useful + public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + public static final MessageMetaDataType.Factory FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0(); + + + private Header _header; + private Properties _properties; + private Map _deliveryAnnotations; + private Map _messageAnnotations; + private Map _appProperties; + private Map _footer; + + private List _encodedSections = new ArrayList(3); + + private volatile ByteBuffer _encoded; + private MessageHeader_1_0 _messageHeader; + + + public MessageMetaData_1_0(List
sections, SectionEncoder encoder) + { + this(sections, encodeSections(sections, encoder)); + } + + private static ArrayList encodeSections(final List
sections, final SectionEncoder encoder) + { + ArrayList encodedSections = new ArrayList(sections.size()); + for(Section section : sections) + { + encoder.encodeObject(section); + encodedSections.add(encoder.getEncoding().asByteBuffer()); + encoder.reset(); + } + return encodedSections; + } + + public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder) + { + this(fragments, decoder, new ArrayList(3)); + } + + public MessageMetaData_1_0(ByteBuffer[] fragments, SectionDecoder decoder, List immuatableSections) + { + this(constructSections(fragments, decoder,immuatableSections), immuatableSections); + } + + private MessageMetaData_1_0(List
sections, List encodedSections) + { + _encodedSections = encodedSections; + + Iterator
sectIter = sections.iterator(); + + Section section = sectIter.hasNext() ? sectIter.next() : null; + if(section instanceof Header) + { + _header = (Header) section; + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof DeliveryAnnotations) + { + _deliveryAnnotations = ((DeliveryAnnotations) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof MessageAnnotations) + { + _messageAnnotations = ((MessageAnnotations) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof Properties) + { + _properties = (Properties) section; + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof ApplicationProperties) + { + _appProperties = ((ApplicationProperties) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + if(section instanceof Footer) + { + _footer = ((Footer) section).getValue(); + section = sectIter.hasNext() ? sectIter.next() : null; + } + + _messageHeader = new MessageHeader_1_0(); + + } + + private static List
constructSections(final ByteBuffer[] fragments, final SectionDecoder decoder, List encodedSections) + { + List
sections = new ArrayList
(3); + + ByteBuffer src; + if(fragments.length == 1) + { + src = fragments[0].duplicate(); + } + else + { + int size = 0; + for(ByteBuffer buf : fragments) + { + size += buf.remaining(); + } + src = ByteBuffer.allocate(size); + for(ByteBuffer buf : fragments) + { + src.put(buf.duplicate()); + } + src.flip(); + + } + + try + { + int startBarePos = -1; + int lastPos = src.position(); + Section s = decoder.readSection(src); + + + + if(s instanceof Header) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof DeliveryAnnotations) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof MessageAnnotations) + { + sections.add(s); + lastPos = src.position(); + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof Properties) + { + sections.add(s); + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof ApplicationProperties) + { + sections.add(s); + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + + if(s instanceof AmqpValue) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + else if(s instanceof Data) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + do + { + s = src.hasRemaining() ? decoder.readSection(src) : null; + } while(s instanceof Data); + } + else if(s instanceof AmqpSequence) + { + if(startBarePos == -1) + { + startBarePos = lastPos; + } + do + { + s = src.hasRemaining() ? decoder.readSection(src) : null; + } + while(s instanceof AmqpSequence); + } + + if(s instanceof Footer) + { + sections.add(s); + } + + + int pos = 0; + for(ByteBuffer buf : fragments) + { +/* + if(pos < startBarePos) + { + if(pos + buf.remaining() > startBarePos) + { + ByteBuffer dup = buf.duplicate(); + dup.position(dup.position()+startBarePos-pos); + dup.slice(); + encodedSections.add(dup); + } + } + else +*/ + { + encodedSections.add(buf.duplicate()); + } + pos += buf.remaining(); + } + + return sections; + } + catch (AmqpErrorException e) + { + e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates. + throw new IllegalArgumentException(e); + } + } + + + public MessageMetaDataType getType() + { + return TYPE; + } + + + public int getStorableSize() + { + int size = 0; + + for(ByteBuffer bin : _encodedSections) + { + size += bin.limit(); + } + + return size; + } + + private ByteBuffer encodeAsBuffer() + { + ByteBuffer buf = ByteBuffer.allocate(getStorableSize()); + + for(ByteBuffer bin : _encodedSections) + { + buf.put(bin.duplicate()); + } + + return buf; + } + + public int writeToBuffer(int offsetInMetaData, ByteBuffer dest) + { + ByteBuffer buf = _encoded; + + if(buf == null) + { + buf = encodeAsBuffer(); + _encoded = buf; + } + + buf = buf.duplicate(); + + buf.position(offsetInMetaData); + + if(dest.remaining() < buf.limit()) + { + buf.limit(dest.remaining()); + } + dest.put(buf); + return buf.limit(); + } + + public int getContentSize() + { + ByteBuffer buf = _encoded; + + if(buf == null) + { + buf = encodeAsBuffer(); + _encoded = buf; + } + return buf.remaining(); + } + + public boolean isPersistent() + { + return _header != null && Boolean.TRUE.equals(_header.getDurable()); + } + + public MessageHeader_1_0 getMessageHeader() + { + return _messageHeader; + } + + + + + private static class MetaDataFactory implements MessageMetaDataType.Factory + { + private final AMQPDescribedTypeRegistry _typeRegistry = AMQPDescribedTypeRegistry.newInstance(); + + private MetaDataFactory() + { + _typeRegistry.registerTransportLayer(); + _typeRegistry.registerMessagingLayer(); + _typeRegistry.registerTransactionLayer(); + _typeRegistry.registerSecurityLayer(); + } + + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) + { + ValueHandler valueHandler = new ValueHandler(_typeRegistry); + + ArrayList
sections = new ArrayList
(3); + ArrayList encodedSections = new ArrayList(3); + + while(buf.hasRemaining()) + { + try + { + ByteBuffer encodedBuf = buf.duplicate(); + Object parse = valueHandler.parse(buf); + sections.add((Section) parse); + encodedBuf.limit(buf.position()); + encodedSections.add(encodedBuf); + + } + catch (AmqpErrorException e) + { + //TODO + throw new RuntimeException(e); + } + + } + + return new MessageMetaData_1_0(sections,encodedSections); + + } + } + + public class MessageHeader_1_0 implements AMQMessageHeader + { + + public String getCorrelationId() + { + if(_properties == null || _properties.getCorrelationId() == null) + { + return null; + } + else + { + return _properties.getMessageId().toString(); + } + } + + public long getExpiration() + { + return 0; //TODO + } + + public String getMessageId() + { + if(_properties == null || _properties.getCorrelationId() == null) + { + return null; + } + else + { + return _properties.getCorrelationId().toString(); + } + } + + public String getMimeType() + { + + if(_properties == null || _properties.getContentType() == null) + { + return null; + } + else + { + return _properties.getContentType().toString(); + } + } + + public String getEncoding() + { + return null; //TODO + } + + public byte getPriority() + { + if(_header == null || _header.getPriority() == null) + { + return 4; //javax.jms.Message.DEFAULT_PRIORITY; + } + else + { + return _header.getPriority().byteValue(); + } + } + + public long getTimestamp() + { + if(_properties == null || _properties.getCreationTime() == null) + { + return 0L; + } + else + { + return _properties.getCreationTime().getTime(); + } + + } + + public String getType() + { + + if(_messageAnnotations == null || _messageAnnotations.get(JMS_TYPE) == null) + { + return null; + } + else + { + return _messageAnnotations.get(JMS_TYPE).toString(); + } + } + + public String getReplyTo() + { + if(_properties == null || _properties.getReplyTo() == null) + { + return null; + } + else + { + return _properties.getReplyTo().toString(); + } + } + + public String getReplyToExchange() + { + return null; //TODO + } + + public String getReplyToRoutingKey() + { + return null; //TODO + } + + public String getAppId() + { + //TODO + return null; + } + + public String getUserId() + { + // TODO + return null; + } + + public Object getHeader(final String name) + { + return _appProperties == null ? null : _appProperties.get(name); + } + + public boolean containsHeaders(final Set names) + { + if(_appProperties == null) + { + return false; + } + + for(String key : names) + { + if(!_appProperties.containsKey(key)) + { + return false; + } + } + return true; + } + + @Override + public Collection getHeaderNames() + { + if(_appProperties == null) + { + return Collections.emptySet(); + } + return Collections.unmodifiableCollection(_appProperties.keySet()); + } + + public boolean containsHeader(final String name) + { + return _appProperties != null && _appProperties.containsKey(name); + } + + public String getSubject() + { + return _properties == null ? null : _properties.getSubject(); + } + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index c87028e190..9dc063e3ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -28,12 +28,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 implements ServerMessage, InboundMessage +public class Message_1_0 implements ServerMessage, InboundMessage { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3e2652b3b8..e971672767 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -41,7 +41,6 @@ import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index a4e9046d84..36eac879b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -64,7 +64,6 @@ import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index fdb36c9013..905c83f7ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -45,6 +45,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonParseException; @@ -1055,8 +1056,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; - StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); messageHandler.message(message); } @@ -1307,8 +1308,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; - StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); return metaData; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java deleted file mode 100755 index df17ae8318..0000000000 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java +++ /dev/null @@ -1,44 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.message.MessageMetaData_1_0; - -import java.nio.ByteBuffer; - -public enum MessageMetaDataType -{ - META_DATA_0_8 { public Factory getFactory() { return MessageMetaData.FACTORY; } }, - META_DATA_0_10 { public Factory getFactory() { return MessageMetaData_0_10.FACTORY; } }, - META_DATA_1_0 { public Factory getFactory() { return MessageMetaData_1_0.FACTORY; } }; - - - - public static interface Factory - { - M createMetaData(ByteBuffer buf); - } - - abstract public Factory getFactory(); - -} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java new file mode 100644 index 0000000000..64f3ab15ee --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.plugin.QpidServiceLoader; + +public class MessageMetaDataTypeRegistry +{ + private static MessageMetaDataType[] values; + + static + { + int maxOrdinal = -1; + + Iterable messageMetaDataTypes = + new QpidServiceLoader().atLeastOneInstanceOf(MessageMetaDataType.class); + + for(MessageMetaDataType type : messageMetaDataTypes) + { + if(type.ordinal()>maxOrdinal) + { + maxOrdinal = type.ordinal(); + } + } + values = new MessageMetaDataType[maxOrdinal+1]; + for(MessageMetaDataType type : new QpidServiceLoader().instancesOf(MessageMetaDataType.class)) + { + if(values[type.ordinal()] != null) + { + throw new IllegalStateException("Multiple MessageDataType (" + +values[type.ordinal()].getClass().getName() + +", " + + type.getClass().getName() + + ") defined for the same ordinal value: " + type.ordinal()); + } + values[type.ordinal()] = type; + } + } + + + public static MessageMetaDataType fromOrdinal(int ordinal) + { + return values[ordinal]; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java index 12d2a6a6c7..9ae6cca8e6 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; +import org.apache.qpid.server.plugin.MessageMetaDataType; public interface StorableMessageMetaData { @@ -34,3 +35,4 @@ public interface StorableMessageMetaData boolean isPersistent(); } + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 1974584581..f7cb6e2bed 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -174,21 +174,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - ServerMessage serverMessage; - switch(message.getMetaData().getType()) - { - case META_DATA_0_8: - serverMessage = new AMQMessage(message); - break; - case META_DATA_0_10: - serverMessage = new MessageTransferMessage(message, null); - break; - case META_DATA_1_0: - serverMessage = new Message_1_0(message); - break; - default: - throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); - } + ServerMessage serverMessage = message.getMetaData().getType().createMessage(message); _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType new file mode 100644 index 0000000000..9aa1d4ce11 --- /dev/null +++ b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8 +org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10 +org.apache.qpid.server.protocol.v1_0.MessageMetaDataType_1_0 -- cgit v1.2.1