diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 11:59:18 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-07-15 11:59:18 +0000 |
| commit | c9de50719b7d7566cc4f0f2cae39bbcf824420de (patch) | |
| tree | 9ed4e546a0eced873e77d5f69c46e3f49c77a2e1 /qpid/java/broker/src | |
| parent | e745b78b3f111daa2c76ddb9cd1afd4ca10417e1 (diff) | |
| download | qpid-python-c9de50719b7d7566cc4f0f2cae39bbcf824420de.tar.gz | |
QPID-4659 : [Java Broker] make message meta data pluggable for different protcol versions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
15 files changed, 321 insertions, 44 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java index df17ae8318..ee89782307 100755..100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java @@ -18,27 +18,25 @@ * under the License. * */ -package org.apache.qpid.server.store; - -import org.apache.qpid.server.protocol.v0_8.MessageMetaData; -import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10; -import org.apache.qpid.server.message.MessageMetaData_1_0; +package org.apache.qpid.server.plugin; import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.store.StorableMessageMetaData; +import org.apache.qpid.server.store.StoredMessage; -public enum MessageMetaDataType +public interface MessageMetaDataType<M extends StorableMessageMetaData> extends Pluggable { - META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } }, - META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } }, - META_DATA_1_0 { public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } }; - - public static interface Factory<M extends StorableMessageMetaData> { M createMetaData(ByteBuffer buf); } - abstract public Factory<? extends StorableMessageMetaData> getFactory(); + public int ordinal(); + + public M createMetaData(ByteBuffer buf); + + public ServerMessage<M> createMessage(StoredMessage<M> msg); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java new file mode 100644 index 0000000000..90fb443f5b --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_10; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10> +{ + + public static final int TYPE = 1; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData_0_10 createMetaData(ByteBuffer buf) + { + return MessageMetaData_0_10.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg) + { + return new MessageTransferMessage(msg, null); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v0_10.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java index 2d17351dad..ee2a40a5b2 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java @@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.transport.DeliveryProperties; import org.apache.qpid.transport.Header; @@ -51,6 +51,8 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10(); + private volatile ByteBuffer _encoded; private Object _connectionReference; @@ -60,7 +62,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis()); } - private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) + public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime) { _header = header; if(_header != null) @@ -83,7 +85,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes public MessageMetaDataType getType() { - return MessageMetaDataType.META_DATA_0_10; + return TYPE; } public int getStorableSize() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java index c539f793fe..4cc590d8cc 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java @@ -29,7 +29,7 @@ import org.apache.qpid.framing.EncodingUtils; import org.apache.qpid.framing.FieldTable; import org.apache.qpid.framing.abstraction.MessagePublishInfo; import org.apache.qpid.server.message.AMQMessageHeader; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.util.ByteBufferOutputStream; import org.apache.qpid.util.ByteBufferInputStream; @@ -56,6 +56,7 @@ public class MessageMetaData implements StorableMessageMetaData private static final byte MANDATORY_FLAG = 1; private static final byte IMMEDIATE_FLAG = 2; public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8(); public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount) { @@ -112,7 +113,7 @@ public class MessageMetaData implements StorableMessageMetaData public MessageMetaDataType getType() { - return MessageMetaDataType.META_DATA_0_8; + return TYPE; } public int getStorableSize() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java new file mode 100644 index 0000000000..9b50127ec7 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v0_8; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaData> +{ + + public static final int TYPE = 0; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData createMetaData(ByteBuffer buf) + { + return MessageMetaData.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage<MessageMetaData> createMessage(StoredMessage<MessageMetaData> msg) + { + return new AMQMessage(msg); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v0_8.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java new file mode 100644 index 0000000000..44b1de74e1 --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java @@ -0,0 +1,67 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import java.nio.ByteBuffer; +import org.apache.qpid.server.message.ServerMessage; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.protocol.AmqpProtocolVersion; +import org.apache.qpid.server.store.StoredMessage; + +public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaData_1_0> +{ + + public static final int TYPE = 2; + + @Override + public int ordinal() + { + return TYPE; + } + + @Override + public MessageMetaData_1_0 createMetaData(ByteBuffer buf) + { + return MessageMetaData_1_0.FACTORY.createMetaData(buf); + } + + @Override + public ServerMessage<MessageMetaData_1_0> createMessage(StoredMessage<MessageMetaData_1_0> msg) + { + return new Message_1_0(msg); + } + + public int hashCode() + { + return ordinal(); + } + + public boolean equals(Object o) + { + return o != null && o.getClass() == getClass(); + } + + @Override + public String getType() + { + return AmqpProtocolVersion.v1_0_0.toString(); + } +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java index 1d8b239733..8d48d70d9a 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.server.message; +package org.apache.qpid.server.protocol.v1_0; import java.nio.ByteBuffer; import java.util.*; @@ -38,13 +38,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Footer; import org.apache.qpid.amqp_1_0.type.messaging.Header; import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; import org.apache.qpid.amqp_1_0.type.messaging.Properties; -import org.apache.qpid.server.store.MessageMetaDataType; +import org.apache.qpid.server.message.AMQMessageHeader; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.store.StorableMessageMetaData; public class MessageMetaData_1_0 implements StorableMessageMetaData { // TODO move to somewhere more useful public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type"); + public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory(); + private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0(); private Header _header; @@ -280,7 +283,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData public MessageMetaDataType getType() { - return MessageMetaDataType.META_DATA_1_0; + return TYPE; } @@ -352,7 +355,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData return _messageHeader; } - public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory(); + private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0> diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java index c87028e190..9dc063e3ea 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java @@ -28,12 +28,11 @@ import java.util.List; import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.InboundMessage; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.store.StoredMessage; -public class Message_1_0 implements ServerMessage, InboundMessage +public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage { diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java index 3e2652b3b8..e971672767 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java @@ -41,7 +41,6 @@ import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; import org.apache.qpid.amqp_1_0.type.transport.Detach; import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; import org.apache.qpid.amqp_1_0.type.transport.Transfer; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.txn.AutoCommitTransaction; import org.apache.qpid.server.txn.ServerTransaction; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java index a4e9046d84..36eac879b9 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java @@ -64,7 +64,6 @@ import org.apache.qpid.typedmessage.TypedBytesFormatException; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.protocol.v0_8.AMQMessage; -import org.apache.qpid.server.message.MessageMetaData_1_0; import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.protocol.AMQSessionModel; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index fdb36c9013..905c83f7ed 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -45,6 +45,7 @@ import org.apache.qpid.AMQException; import org.apache.qpid.AMQStoreException; import org.apache.qpid.server.message.EnqueableMessage; import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.plugin.MessageMetaDataType; import org.apache.qpid.server.queue.AMQQueue; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonParseException; @@ -1055,8 +1056,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; - StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true); messageHandler.message(message); } @@ -1307,8 +1308,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC ByteBuffer buf = ByteBuffer.wrap(dataAsBytes); buf.position(1); buf = buf.slice(); - MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]]; - StorableMessageMetaData metaData = type.getFactory().createMetaData(buf); + MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]); + StorableMessageMetaData metaData = type.createMetaData(buf); return metaData; } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java new file mode 100644 index 0000000000..64f3ab15ee --- /dev/null +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java @@ -0,0 +1,65 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.plugin.QpidServiceLoader; + +public class MessageMetaDataTypeRegistry +{ + private static MessageMetaDataType[] values; + + static + { + int maxOrdinal = -1; + + Iterable<MessageMetaDataType> messageMetaDataTypes = + new QpidServiceLoader<MessageMetaDataType>().atLeastOneInstanceOf(MessageMetaDataType.class); + + for(MessageMetaDataType type : messageMetaDataTypes) + { + if(type.ordinal()>maxOrdinal) + { + maxOrdinal = type.ordinal(); + } + } + values = new MessageMetaDataType[maxOrdinal+1]; + for(MessageMetaDataType type : new QpidServiceLoader<MessageMetaDataType>().instancesOf(MessageMetaDataType.class)) + { + if(values[type.ordinal()] != null) + { + throw new IllegalStateException("Multiple MessageDataType (" + +values[type.ordinal()].getClass().getName() + +", " + + type.getClass().getName() + + ") defined for the same ordinal value: " + type.ordinal()); + } + values[type.ordinal()] = type; + } + } + + + public static MessageMetaDataType fromOrdinal(int ordinal) + { + return values[ordinal]; + } + +} diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java index 12d2a6a6c7..9ae6cca8e6 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; +import org.apache.qpid.server.plugin.MessageMetaDataType; public interface StorableMessageMetaData { @@ -34,3 +35,4 @@ public interface StorableMessageMetaData boolean isPersistent(); } + diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java index 1974584581..f7cb6e2bed 100755 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java @@ -174,21 +174,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa public void message(StoredMessage message) { - ServerMessage serverMessage; - switch(message.getMetaData().getType()) - { - case META_DATA_0_8: - serverMessage = new AMQMessage(message); - break; - case META_DATA_0_10: - serverMessage = new MessageTransferMessage(message, null); - break; - case META_DATA_1_0: - serverMessage = new Message_1_0(message); - break; - default: - throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass()); - } + ServerMessage serverMessage = message.getMetaData().getType().createMessage(message); _recoveredMessages.put(message.getMessageNumber(), serverMessage); _unusedMessages.put(message.getMessageNumber(), message); diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType new file mode 100644 index 0000000000..9aa1d4ce11 --- /dev/null +++ b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType @@ -0,0 +1,21 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8 +org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10 +org.apache.qpid.server.protocol.v1_0.MessageMetaDataType_1_0 |
