summaryrefslogtreecommitdiff
path: root/qpid/java/broker/src
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2013-07-15 11:59:18 +0000
committerRobert Godfrey <rgodfrey@apache.org>2013-07-15 11:59:18 +0000
commitc9de50719b7d7566cc4f0f2cae39bbcf824420de (patch)
tree9ed4e546a0eced873e77d5f69c46e3f49c77a2e1 /qpid/java/broker/src
parente745b78b3f111daa2c76ddb9cd1afd4ca10417e1 (diff)
downloadqpid-python-c9de50719b7d7566cc4f0f2cae39bbcf824420de.tar.gz
QPID-4659 : [Java Broker] make message meta data pluggable for different protcol versions
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1503192 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker/src')
-rw-r--r--[-rwxr-xr-x]qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java)22
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java67
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java8
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java5
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java67
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java67
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (renamed from qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java)11
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java3
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java1
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java9
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java65
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java2
-rwxr-xr-xqpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java16
-rw-r--r--qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType21
15 files changed, 321 insertions, 44 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
index df17ae8318..ee89782307 100755..100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataType.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/plugin/MessageMetaDataType.java
@@ -18,27 +18,25 @@
* under the License.
*
*/
-package org.apache.qpid.server.store;
-
-import org.apache.qpid.server.protocol.v0_8.MessageMetaData;
-import org.apache.qpid.server.protocol.v0_10.MessageMetaData_0_10;
-import org.apache.qpid.server.message.MessageMetaData_1_0;
+package org.apache.qpid.server.plugin;
import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.store.StorableMessageMetaData;
+import org.apache.qpid.server.store.StoredMessage;
-public enum MessageMetaDataType
+public interface MessageMetaDataType<M extends StorableMessageMetaData> extends Pluggable
{
- META_DATA_0_8 { public Factory<MessageMetaData> getFactory() { return MessageMetaData.FACTORY; } },
- META_DATA_0_10 { public Factory<MessageMetaData_0_10> getFactory() { return MessageMetaData_0_10.FACTORY; } },
- META_DATA_1_0 { public Factory<MessageMetaData_1_0> getFactory() { return MessageMetaData_1_0.FACTORY; } };
-
-
public static interface Factory<M extends StorableMessageMetaData>
{
M createMetaData(ByteBuffer buf);
}
- abstract public Factory<? extends StorableMessageMetaData> getFactory();
+ public int ordinal();
+
+ public M createMetaData(ByteBuffer buf);
+
+ public ServerMessage<M> createMessage(StoredMessage<M> msg);
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
new file mode 100644
index 0000000000..90fb443f5b
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaDataType_0_10.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_10;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class MessageMetaDataType_0_10 implements MessageMetaDataType<MessageMetaData_0_10>
+{
+
+ public static final int TYPE = 1;
+
+ @Override
+ public int ordinal()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public MessageMetaData_0_10 createMetaData(ByteBuffer buf)
+ {
+ return MessageMetaData_0_10.FACTORY.createMetaData(buf);
+ }
+
+ @Override
+ public ServerMessage<MessageMetaData_0_10> createMessage(StoredMessage<MessageMetaData_0_10> msg)
+ {
+ return new MessageTransferMessage(msg, null);
+ }
+
+ public int hashCode()
+ {
+ return ordinal();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o != null && o.getClass() == getClass();
+ }
+
+ @Override
+ public String getType()
+ {
+ return AmqpProtocolVersion.v0_10.toString();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
index 2d17351dad..ee2a40a5b2 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_10/MessageMetaData_0_10.java
@@ -23,7 +23,7 @@ package org.apache.qpid.server.protocol.v0_10;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.message.AMQMessageHeader;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.transport.DeliveryProperties;
import org.apache.qpid.transport.Header;
@@ -51,6 +51,8 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
public static final MessageMetaDataType.Factory<MessageMetaData_0_10> FACTORY = new MetaDataFactory();
+ private static final MessageMetaDataType_0_10 TYPE = new MessageMetaDataType_0_10();
+
private volatile ByteBuffer _encoded;
private Object _connectionReference;
@@ -60,7 +62,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
this(xfr.getHeader(), xfr.getBodySize(), System.currentTimeMillis());
}
- private MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
+ public MessageMetaData_0_10(Header header, int bodySize, long arrivalTime)
{
_header = header;
if(_header != null)
@@ -83,7 +85,7 @@ public class MessageMetaData_0_10 implements StorableMessageMetaData, InboundMes
public MessageMetaDataType getType()
{
- return MessageMetaDataType.META_DATA_0_10;
+ return TYPE;
}
public int getStorableSize()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
index c539f793fe..4cc590d8cc 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaData.java
@@ -29,7 +29,7 @@ import org.apache.qpid.framing.EncodingUtils;
import org.apache.qpid.framing.FieldTable;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.message.AMQMessageHeader;
-import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.util.ByteBufferOutputStream;
import org.apache.qpid.util.ByteBufferInputStream;
@@ -56,6 +56,7 @@ public class MessageMetaData implements StorableMessageMetaData
private static final byte MANDATORY_FLAG = 1;
private static final byte IMMEDIATE_FLAG = 2;
public static final MessageMetaDataType.Factory<MessageMetaData> FACTORY = new MetaDataFactory();
+ private static final MessageMetaDataType_0_8 TYPE = new MessageMetaDataType_0_8();
public MessageMetaData(MessagePublishInfo publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
{
@@ -112,7 +113,7 @@ public class MessageMetaData implements StorableMessageMetaData
public MessageMetaDataType getType()
{
- return MessageMetaDataType.META_DATA_0_8;
+ return TYPE;
}
public int getStorableSize()
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
new file mode 100644
index 0000000000..9b50127ec7
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v0_8/MessageMetaDataType_0_8.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v0_8;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class MessageMetaDataType_0_8 implements MessageMetaDataType<MessageMetaData>
+{
+
+ public static final int TYPE = 0;
+
+ @Override
+ public int ordinal()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public MessageMetaData createMetaData(ByteBuffer buf)
+ {
+ return MessageMetaData.FACTORY.createMetaData(buf);
+ }
+
+ @Override
+ public ServerMessage<MessageMetaData> createMessage(StoredMessage<MessageMetaData> msg)
+ {
+ return new AMQMessage(msg);
+ }
+
+ public int hashCode()
+ {
+ return ordinal();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o != null && o.getClass() == getClass();
+ }
+
+ @Override
+ public String getType()
+ {
+ return AmqpProtocolVersion.v0_8.toString();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
new file mode 100644
index 0000000000..44b1de74e1
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaDataType_1_0.java
@@ -0,0 +1,67 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.protocol.v1_0;
+
+import java.nio.ByteBuffer;
+import org.apache.qpid.server.message.ServerMessage;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.protocol.AmqpProtocolVersion;
+import org.apache.qpid.server.store.StoredMessage;
+
+public class MessageMetaDataType_1_0 implements MessageMetaDataType<MessageMetaData_1_0>
+{
+
+ public static final int TYPE = 2;
+
+ @Override
+ public int ordinal()
+ {
+ return TYPE;
+ }
+
+ @Override
+ public MessageMetaData_1_0 createMetaData(ByteBuffer buf)
+ {
+ return MessageMetaData_1_0.FACTORY.createMetaData(buf);
+ }
+
+ @Override
+ public ServerMessage<MessageMetaData_1_0> createMessage(StoredMessage<MessageMetaData_1_0> msg)
+ {
+ return new Message_1_0(msg);
+ }
+
+ public int hashCode()
+ {
+ return ordinal();
+ }
+
+ public boolean equals(Object o)
+ {
+ return o != null && o.getClass() == getClass();
+ }
+
+ @Override
+ public String getType()
+ {
+ return AmqpProtocolVersion.v1_0_0.toString();
+ }
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
index 1d8b239733..8d48d70d9a 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/message/MessageMetaData_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java
@@ -18,7 +18,7 @@
* under the License.
*
*/
-package org.apache.qpid.server.message;
+package org.apache.qpid.server.protocol.v1_0;
import java.nio.ByteBuffer;
import java.util.*;
@@ -38,13 +38,16 @@ import org.apache.qpid.amqp_1_0.type.messaging.Footer;
import org.apache.qpid.amqp_1_0.type.messaging.Header;
import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations;
import org.apache.qpid.amqp_1_0.type.messaging.Properties;
-import org.apache.qpid.server.store.MessageMetaDataType;
+import org.apache.qpid.server.message.AMQMessageHeader;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.store.StorableMessageMetaData;
public class MessageMetaData_1_0 implements StorableMessageMetaData
{
// TODO move to somewhere more useful
public static final Symbol JMS_TYPE = Symbol.valueOf("x-opt-jms-type");
+ public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+ private static final MessageMetaDataType_1_0 TYPE = new MessageMetaDataType_1_0();
private Header _header;
@@ -280,7 +283,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
public MessageMetaDataType getType()
{
- return MessageMetaDataType.META_DATA_1_0;
+ return TYPE;
}
@@ -352,7 +355,7 @@ public class MessageMetaData_1_0 implements StorableMessageMetaData
return _messageHeader;
}
- public static final MessageMetaDataType.Factory<MessageMetaData_1_0> FACTORY = new MetaDataFactory();
+
private static class MetaDataFactory implements MessageMetaDataType.Factory<MessageMetaData_1_0>
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
index c87028e190..9dc063e3ea 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java
@@ -28,12 +28,11 @@ import java.util.List;
import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.server.message.InboundMessage;
-import org.apache.qpid.server.message.MessageMetaData_1_0;
import org.apache.qpid.server.message.MessageReference;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.store.StoredMessage;
-public class Message_1_0 implements ServerMessage, InboundMessage
+public class Message_1_0 implements ServerMessage<MessageMetaData_1_0>, InboundMessage
{
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
index 3e2652b3b8..e971672767 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java
@@ -41,7 +41,6 @@ import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState;
import org.apache.qpid.amqp_1_0.type.transport.Detach;
import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode;
import org.apache.qpid.amqp_1_0.type.transport.Transfer;
-import org.apache.qpid.server.message.MessageMetaData_1_0;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.txn.AutoCommitTransaction;
import org.apache.qpid.server.txn.ServerTransaction;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
index a4e9046d84..36eac879b9 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/protocol/v1_0/Subscription_1_0.java
@@ -64,7 +64,6 @@ import org.apache.qpid.typedmessage.TypedBytesFormatException;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.protocol.v0_8.AMQMessage;
-import org.apache.qpid.server.message.MessageMetaData_1_0;
import org.apache.qpid.server.protocol.v0_10.MessageTransferMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.protocol.AMQSessionModel;
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index fdb36c9013..905c83f7ed 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -45,6 +45,7 @@ import org.apache.qpid.AMQException;
import org.apache.qpid.AMQStoreException;
import org.apache.qpid.server.message.EnqueableMessage;
import org.apache.qpid.server.model.VirtualHost;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
import org.apache.qpid.server.queue.AMQQueue;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonParseException;
@@ -1055,8 +1056,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
StoredJDBCMessage message = new StoredJDBCMessage(messageId, metaData, true);
messageHandler.message(message);
}
@@ -1307,8 +1308,8 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
ByteBuffer buf = ByteBuffer.wrap(dataAsBytes);
buf.position(1);
buf = buf.slice();
- MessageMetaDataType type = MessageMetaDataType.values()[dataAsBytes[0]];
- StorableMessageMetaData metaData = type.getFactory().createMetaData(buf);
+ MessageMetaDataType type = MessageMetaDataTypeRegistry.fromOrdinal(dataAsBytes[0]);
+ StorableMessageMetaData metaData = type.createMetaData(buf);
return metaData;
}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java
new file mode 100644
index 0000000000..64f3ab15ee
--- /dev/null
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageMetaDataTypeRegistry.java
@@ -0,0 +1,65 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.plugin.QpidServiceLoader;
+
+public class MessageMetaDataTypeRegistry
+{
+ private static MessageMetaDataType[] values;
+
+ static
+ {
+ int maxOrdinal = -1;
+
+ Iterable<MessageMetaDataType> messageMetaDataTypes =
+ new QpidServiceLoader<MessageMetaDataType>().atLeastOneInstanceOf(MessageMetaDataType.class);
+
+ for(MessageMetaDataType type : messageMetaDataTypes)
+ {
+ if(type.ordinal()>maxOrdinal)
+ {
+ maxOrdinal = type.ordinal();
+ }
+ }
+ values = new MessageMetaDataType[maxOrdinal+1];
+ for(MessageMetaDataType type : new QpidServiceLoader<MessageMetaDataType>().instancesOf(MessageMetaDataType.class))
+ {
+ if(values[type.ordinal()] != null)
+ {
+ throw new IllegalStateException("Multiple MessageDataType ("
+ +values[type.ordinal()].getClass().getName()
+ +", "
+ + type.getClass().getName()
+ + ") defined for the same ordinal value: " + type.ordinal());
+ }
+ values[type.ordinal()] = type;
+ }
+ }
+
+
+ public static MessageMetaDataType fromOrdinal(int ordinal)
+ {
+ return values[ordinal];
+ }
+
+}
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
index 12d2a6a6c7..9ae6cca8e6 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StorableMessageMetaData.java
@@ -21,6 +21,7 @@
package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
public interface StorableMessageMetaData
{
@@ -34,3 +35,4 @@ public interface StorableMessageMetaData
boolean isPersistent();
}
+
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
index 1974584581..f7cb6e2bed 100755
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/virtualhost/VirtualHostConfigRecoveryHandler.java
@@ -174,21 +174,7 @@ public class VirtualHostConfigRecoveryHandler implements ConfigurationRecoveryHa
public void message(StoredMessage message)
{
- ServerMessage serverMessage;
- switch(message.getMetaData().getType())
- {
- case META_DATA_0_8:
- serverMessage = new AMQMessage(message);
- break;
- case META_DATA_0_10:
- serverMessage = new MessageTransferMessage(message, null);
- break;
- case META_DATA_1_0:
- serverMessage = new Message_1_0(message);
- break;
- default:
- throw new RuntimeException("Unknown message type retrieved from store " + message.getMetaData().getClass());
- }
+ ServerMessage serverMessage = message.getMetaData().getType().createMessage(message);
_recoveredMessages.put(message.getMessageNumber(), serverMessage);
_unusedMessages.put(message.getMessageNumber(), message);
diff --git a/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
new file mode 100644
index 0000000000..9aa1d4ce11
--- /dev/null
+++ b/qpid/java/broker/src/test/resources/META-INF/services/org.apache.qpid.server.plugin.MessageMetaDataType
@@ -0,0 +1,21 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+#
+org.apache.qpid.server.protocol.v0_8.MessageMetaDataType_0_8
+org.apache.qpid.server.protocol.v0_10.MessageMetaDataType_0_10
+org.apache.qpid.server.protocol.v1_0.MessageMetaDataType_1_0