summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core/src
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2014-06-06 15:43:08 +0000
committerKeith Wall <kwall@apache.org>2014-06-06 15:43:08 +0000
commit39249098b7b374c5e45d7139aa8b9df3aebad385 (patch)
treeab13b41b26d2036f5765e3a95b8692fe3903ce54 /qpid/java/broker-core/src
parent53fd008b70676ce1382bec414bcd0d86299a4ced (diff)
downloadqpid-python-39249098b7b374c5e45d7139aa8b9df3aebad385.tar.gz
QPID-5800: [Java Broker} Refactor MessageStore implementations extracting a MessageStoreProvider interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java371
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java109
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java308
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java26
-rwxr-xr-xqpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java5
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java32
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java47
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java23
-rw-r--r--qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java4
13 files changed, 562 insertions, 414 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index e0c1f77d2b..1db2c89d05 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -46,15 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.log4j.Logger;
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.BrokerModel;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.model.UUIDGenerator;
-import org.apache.qpid.server.plugin.MessageMetaDataType;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
import org.codehaus.jackson.JsonGenerationException;
import org.codehaus.jackson.JsonGenerator;
import org.codehaus.jackson.JsonParseException;
@@ -67,7 +58,17 @@ import org.codehaus.jackson.map.ObjectMapper;
import org.codehaus.jackson.map.SerializerProvider;
import org.codehaus.jackson.map.module.SimpleModule;
-abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.BrokerModel;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.model.UUIDGenerator;
+import org.apache.qpid.server.plugin.MessageMetaDataType;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore
{
private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION";
private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION";
@@ -369,7 +370,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
{
if (_messageStoreOpen.compareAndSet(false, true))
@@ -952,7 +952,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
public void closeMessageStore()
{
if (_messageStoreOpen.compareAndSet(true, false))
@@ -978,7 +977,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract void doClose();
- @Override
public StoredMessage addMessage(StorableMessageMetaData metaData)
{
checkMessageStoreOpen();
@@ -1132,7 +1130,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract Connection getConnection() throws SQLException;
- @Override
public Transaction newTransaction()
{
checkMessageStoreOpen();
@@ -1665,7 +1662,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
- @Override
public boolean isPersistent()
{
return true;
@@ -1975,7 +1971,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void addEventListener(EventListener eventListener, Event... events)
{
_eventManager.addEventListener(eventListener, events);
@@ -2250,7 +2245,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitMessages(MessageHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2301,7 +2295,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2346,7 +2339,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
}
}
- @Override
public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
{
checkMessageStoreOpen();
@@ -2447,8 +2439,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC
protected abstract void storedSizeChange(int storeSizeIncrease);
-
- @Override
public void onDelete()
{
// TODO should probably check we are closed
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
index e2c43f5012..e69de29bb2 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java
@@ -1,371 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-package org.apache.qpid.server.store;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.atomic.AtomicLong;
-
-import org.apache.qpid.server.message.EnqueueableMessage;
-import org.apache.qpid.server.model.ConfiguredObject;
-import org.apache.qpid.server.store.Transaction.Record;
-import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
-import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
-import org.apache.qpid.server.store.handler.MessageHandler;
-import org.apache.qpid.server.store.handler.MessageInstanceHandler;
-
-/** A simple message store that stores the messages in a thread-safe structure in memory. */
-abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore
-{
- private final class MemoryMessageStoreTransaction implements Transaction
- {
- private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
- private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
-
- private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
- private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
-
- @Override
- public StoreFuture commitTranAsync()
- {
- return StoreFuture.IMMEDIATE_FUTURE;
- }
-
- @Override
- public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _localEnqueueMap.put(queue.getId(), messageIds);
- }
- messageIds.add(message.getMessageNumber());
- }
-
- @Override
- public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
- {
- Set<Long> messageIds = _localDequeueMap.get(queue.getId());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _localDequeueMap.put(queue.getId(), messageIds);
- }
- messageIds.add(message.getMessageNumber());
- }
-
- @Override
- public void commitTran()
- {
- commitTransactionInternal(this);
- _localEnqueueMap.clear();
- _localDequeueMap.clear();
- }
-
- @Override
- public void abortTran()
- {
- _localEnqueueMap.clear();
- _localDequeueMap.clear();
- }
-
- @Override
- public void removeXid(long format, byte[] globalId, byte[] branchId)
- {
- _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
- }
-
- @Override
- public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
- {
- _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
- }
- }
-
- private final AtomicLong _messageId = new AtomicLong(1);
-
- private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
-
- protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
-
- private Object _transactionLock = new Object();
- private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
- private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
-
- @SuppressWarnings("unchecked")
- @Override
- public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData)
- {
- long id = _messageId.getAndIncrement();
-
- if(metaData.isPersistent())
- {
- return new StoredMemoryMessage(id, metaData)
- {
-
- @Override
- public StoreFuture flushToStore()
- {
- _messages.putIfAbsent(getMessageNumber(), this) ;
- return super.flushToStore();
- }
-
- @Override
- public void remove()
- {
- _messages.remove(getMessageNumber());
- super.remove();
- }
-
- };
- }
- else
- {
- return new StoredMemoryMessage(id, metaData);
- }
- }
-
- private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
- {
- synchronized (_transactionLock )
- {
- for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet())
- {
- Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey());
- if (messageIds == null)
- {
- messageIds = new HashSet<Long>();
- _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds);
- }
- messageIds.addAll(loacalEnqueuedEntry.getValue());
- }
-
- for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
- {
- Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
- if (messageIds != null)
- {
- messageIds.removeAll(loacalDequeueEntry.getValue());
- if (messageIds.isEmpty())
- {
- _messageInstances.remove(loacalDequeueEntry.getKey());
- }
- }
- }
-
- for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
- {
- _distributedTransactions.put(entry.getKey(), entry.getValue());
- }
-
- for (Xid removed : transaction._localDistributedTransactionsRemoves)
- {
- _distributedTransactions.remove(removed);
- }
-
- }
-
-
- }
-
- @Override
- public Transaction newTransaction()
- {
- return new MemoryMessageStoreTransaction();
- }
-
- @Override
- public boolean isPersistent()
- {
- return false;
- }
-
- @Override
- public void addEventListener(EventListener eventListener, Event... events)
- {
- }
-
- @Override
- public void create(ConfiguredObjectRecord record)
- {
- if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
- {
- throw new StoreException("Record with id " + record.getId() + " is already present");
- }
- }
-
- @Override
- public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
- {
- for (ConfiguredObjectRecord record : records)
- {
- ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
- if (previousValue == null && !createIfNecessary)
- {
- throw new StoreException("Record with id " + record.getId() + " does not exist");
- }
- }
- }
-
- @Override
- public UUID[] remove(final ConfiguredObjectRecord... objects)
- {
- List<UUID> removed = new ArrayList<UUID>();
- for (ConfiguredObjectRecord record : objects)
- {
- if (_configuredObjectRecords.remove(record.getId()) != null)
- {
- removed.add(record.getId());
- }
- }
- return removed.toArray(new UUID[removed.size()]);
- }
-
- @Override
- public void closeConfigurationStore()
- {
- _configuredObjectRecords.clear();
- }
-
- @Override
- public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
- {
- }
-
- @Override
- public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
- {
- handler.begin();
- for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
- {
- if (!handler.handle(record))
- {
- break;
- }
- }
- handler.end();
- }
-
- @Override
- public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings)
- {
- }
-
- @Override
- public void closeMessageStore()
- {
- _messages.clear();
- synchronized (_transactionLock)
- {
- _messageInstances.clear();
- _distributedTransactions.clear();
- }
- }
-
- @Override
- public String getStoreLocation()
- {
- return null;
- }
-
- @Override
- public void onDelete()
- {
- }
-
- @Override
- public void visitMessages(MessageHandler handler) throws StoreException
- {
- for (StoredMemoryMessage message : _messages.values())
- {
- if(!handler.handle(message))
- {
- break;
- }
- }
- }
-
- @Override
- public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
- {
- for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
- {
- UUID resourceId = enqueuedEntry.getKey();
- for (Long messageId : enqueuedEntry.getValue())
- {
- if (!handler.handle(resourceId, messageId))
- {
- return;
- }
- }
- }
- }
- }
-
- @Override
- public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException
- {
- synchronized (_transactionLock)
- {
- for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
- {
- Xid xid = entry.getKey();
- DistributedTransactionRecords records = entry.getValue();
- if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues()))
- {
- break;
- }
- }
- }
- }
-
- private static final class DistributedTransactionRecords
- {
- private Record[] _enqueues;
- private Record[] _dequeues;
-
- public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues)
- {
- super();
- _enqueues = enqueues;
- _dequeues = dequeues;
- }
-
- public Record[] getEnqueues()
- {
- return _enqueues;
- }
-
- public Record[] getDequeues()
- {
- return _dequeues;
- }
- }
-}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
new file mode 100644
index 0000000000..267e1d9cb3
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java
@@ -0,0 +1,109 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
+
+abstract class AbstractMemoryStore implements DurableConfigurationStore, MessageStoreProvider
+{
+ private final MessageStore _messageStore = new MemoryMessageStore();
+
+
+ private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>();
+
+
+
+ @Override
+ public void create(ConfiguredObjectRecord record)
+ {
+ if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null)
+ {
+ throw new StoreException("Record with id " + record.getId() + " is already present");
+ }
+ }
+
+ @Override
+ public void update(boolean createIfNecessary, ConfiguredObjectRecord... records)
+ {
+ for (ConfiguredObjectRecord record : records)
+ {
+ ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record);
+ if (previousValue == null && !createIfNecessary)
+ {
+ throw new StoreException("Record with id " + record.getId() + " does not exist");
+ }
+ }
+ }
+
+ @Override
+ public UUID[] remove(final ConfiguredObjectRecord... objects)
+ {
+ List<UUID> removed = new ArrayList<UUID>();
+ for (ConfiguredObjectRecord record : objects)
+ {
+ if (_configuredObjectRecords.remove(record.getId()) != null)
+ {
+ removed.add(record.getId());
+ }
+ }
+ return removed.toArray(new UUID[removed.size()]);
+ }
+
+ @Override
+ public void closeConfigurationStore()
+ {
+ _configuredObjectRecords.clear();
+ }
+
+ @Override
+ public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings)
+ {
+ }
+
+ @Override
+ public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException
+ {
+ handler.begin();
+ for (ConfiguredObjectRecord record : _configuredObjectRecords.values())
+ {
+ if (!handler.handle(record))
+ {
+ break;
+ }
+ }
+ handler.end();
+ }
+
+ @Override
+ public MessageStore getMessageStore()
+ {
+ return _messageStore;
+ }
+
+
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
new file mode 100644
index 0000000000..44d640ca86
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
@@ -0,0 +1,308 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.qpid.server.message.EnqueueableMessage;
+import org.apache.qpid.server.model.ConfiguredObject;
+import org.apache.qpid.server.store.handler.DistributedTransactionHandler;
+import org.apache.qpid.server.store.handler.MessageHandler;
+import org.apache.qpid.server.store.handler.MessageInstanceHandler;
+
+/** A simple message store that stores the messages in a thread-safe structure in memory. */
+public class MemoryMessageStore implements MessageStore
+{
+ private final AtomicLong _messageId = new AtomicLong(1);
+
+ private final ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>();
+ private final Object _transactionLock = new Object();
+ private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>();
+ private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>();
+
+
+ private final class MemoryMessageStoreTransaction implements Transaction
+ {
+ private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>();
+ private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>();
+
+ private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>();
+ private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>();
+
+ @Override
+ public StoreFuture commitTranAsync()
+ {
+ return StoreFuture.IMMEDIATE_FUTURE;
+ }
+
+ @Override
+ public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ Set<Long> messageIds = _localEnqueueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localEnqueueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
+ }
+
+ @Override
+ public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message)
+ {
+ Set<Long> messageIds = _localDequeueMap.get(queue.getId());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _localDequeueMap.put(queue.getId(), messageIds);
+ }
+ messageIds.add(message.getMessageNumber());
+ }
+
+ @Override
+ public void commitTran()
+ {
+ commitTransactionInternal(this);
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
+ }
+
+ @Override
+ public void abortTran()
+ {
+ _localEnqueueMap.clear();
+ _localDequeueMap.clear();
+ }
+
+ @Override
+ public void removeXid(long format, byte[] globalId, byte[] branchId)
+ {
+ _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId));
+ }
+
+ @Override
+ public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues)
+ {
+ _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues));
+ }
+ }
+
+ private static final class DistributedTransactionRecords
+ {
+ private Transaction.Record[] _enqueues;
+ private Transaction.Record[] _dequeues;
+
+ public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues)
+ {
+ super();
+ _enqueues = enqueues;
+ _dequeues = dequeues;
+ }
+
+ public Transaction.Record[] getEnqueues()
+ {
+ return _enqueues;
+ }
+
+ public Transaction.Record[] getDequeues()
+ {
+ return _dequeues;
+ }
+ }
+
+ private void commitTransactionInternal(MemoryMessageStoreTransaction transaction)
+ {
+ synchronized (_transactionLock )
+ {
+ for (Map.Entry<UUID, Set<Long>> localEnqueuedEntry : transaction._localEnqueueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(localEnqueuedEntry.getKey());
+ if (messageIds == null)
+ {
+ messageIds = new HashSet<Long>();
+ _messageInstances.put(localEnqueuedEntry.getKey(), messageIds);
+ }
+ messageIds.addAll(localEnqueuedEntry.getValue());
+ }
+
+ for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet())
+ {
+ Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey());
+ if (messageIds != null)
+ {
+ messageIds.removeAll(loacalDequeueEntry.getValue());
+ if (messageIds.isEmpty())
+ {
+ _messageInstances.remove(loacalDequeueEntry.getKey());
+ }
+ }
+ }
+
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet())
+ {
+ _distributedTransactions.put(entry.getKey(), entry.getValue());
+ }
+
+ for (Xid removed : transaction._localDistributedTransactionsRemoves)
+ {
+ _distributedTransactions.remove(removed);
+ }
+
+ }
+
+
+ }
+
+
+ @Override
+ public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings)
+ {
+ }
+
+ @Override
+ public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData)
+ {
+ long id = _messageId.getAndIncrement();
+
+ if(metaData.isPersistent())
+ {
+ return new StoredMemoryMessage<T>(id, metaData)
+ {
+
+ @Override
+ public StoreFuture flushToStore()
+ {
+ _messages.putIfAbsent(getMessageNumber(), this) ;
+ return super.flushToStore();
+ }
+
+ @Override
+ public void remove()
+ {
+ _messages.remove(getMessageNumber());
+ super.remove();
+ }
+
+ };
+ }
+ else
+ {
+ return new StoredMemoryMessage<T>(id, metaData);
+ }
+ }
+
+ @Override
+ public boolean isPersistent()
+ {
+ return false;
+ }
+
+ @Override
+ public Transaction newTransaction()
+ {
+ return new MemoryMessageStoreTransaction();
+ }
+
+ @Override
+ public void closeMessageStore()
+ {
+ _messages.clear();
+ synchronized (_transactionLock)
+ {
+ _messageInstances.clear();
+ _distributedTransactions.clear();
+ }
+ }
+
+ @Override
+ public void addEventListener(final EventListener eventListener, final Event... events)
+ {
+ }
+
+ @Override
+ public String getStoreLocation()
+ {
+ return null;
+ }
+
+ @Override
+ public void onDelete()
+ {
+ }
+
+ @Override
+ public void visitMessages(final MessageHandler handler) throws StoreException
+ {
+ for (StoredMemoryMessage message : _messages.values())
+ {
+ if(!handler.handle(message))
+ {
+ break;
+ }
+ }
+ }
+
+ @Override
+ public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet())
+ {
+ UUID resourceId = enqueuedEntry.getKey();
+ for (Long messageId : enqueuedEntry.getValue())
+ {
+ if (!handler.handle(resourceId, messageId))
+ {
+ return;
+ }
+ }
+ }
+ }
+ }
+
+ @Override
+ public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException
+ {
+ synchronized (_transactionLock)
+ {
+ for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet())
+ {
+ Xid xid = entry.getKey();
+ DistributedTransactionRecords records = entry.getValue();
+ if (!handler.handle(xid.getFormat(),
+ xid.getGlobalId(),
+ xid.getBranchId(),
+ records.getEnqueues(),
+ records.getDequeues()))
+ {
+ break;
+ }
+ }
+ }
+ }
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java
new file mode 100644
index 0000000000..94d58013d2
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java
@@ -0,0 +1,26 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+public interface MessageStoreProvider
+{
+ MessageStore getMessageStore();
+}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
index e7302270bb..f4e1376980 100755
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java
@@ -23,13 +23,13 @@ package org.apache.qpid.server.store;
import java.nio.ByteBuffer;
-public class StoredMemoryMessage implements StoredMessage
+public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
{
private final long _messageNumber;
private ByteBuffer _content;
- private final StorableMessageMetaData _metaData;
+ private final T _metaData;
- public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData)
+ public StoredMemoryMessage(long messageNumber, T metaData)
{
_messageNumber = messageNumber;
_metaData = metaData;
@@ -128,7 +128,7 @@ public class StoredMemoryMessage implements StoredMessage
}
- public StorableMessageMetaData getMetaData()
+ public T getMetaData()
{
return _metaData;
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
index d27cd1c13e..9e10d5e424 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java
@@ -77,6 +77,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.Event;
import org.apache.qpid.server.store.EventListener;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
import org.apache.qpid.server.store.StoreException;
import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler;
import org.apache.qpid.server.txn.DtxRegistry;
@@ -196,13 +197,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
DurableConfigurationStore durableConfigurationStore = _virtualHostNode.getConfigurationStore();
+ // TODO attribute messageStoreProvider is to be removed
boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider();
if (nodeIsMessageStoreProvider)
{
- if (!(durableConfigurationStore instanceof MessageStore))
+ if (!(durableConfigurationStore instanceof MessageStoreProvider))
{
throw new IllegalConfigurationException("Virtual host node " + _virtualHostNode.getName()
- + " is configured as a provider of message store but the MessageStore interface is not implemented on a configuration store of type "
+ + " is configured as a provider of message store but the MessageStoreProvider interface is not implemented on a configuration store of type "
+ durableConfigurationStore.getClass().getName());
}
}
@@ -215,7 +217,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte
+ ". You can either configure the message store setting on the host or "
+ (durableConfigurationStore instanceof MessageStore ?
" configure VirtualHostNode " + _virtualHostNode.getName() + " as a provider of message store" :
- " change the node type to one having configuration store implementing the MessageStore inteface") );
+ " change the node type to one having configuration store implementing the MessageStore interface") );
}
String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE);
if (storeType == null)
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
index 62e545659b..b9356b368b 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java
@@ -29,6 +29,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor;
import org.apache.qpid.server.model.VirtualHostNode;
import org.apache.qpid.server.plugin.MessageStoreFactory;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
@ManagedObject( category = false, type = "STANDARD")
public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost>
@@ -96,7 +97,7 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost
VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class);
if (virtualHostNode.isMessageStoreProvider())
{
- _messageStore = (MessageStore)virtualHostNode.getConfigurationStore();
+ _messageStore = ((MessageStoreProvider)virtualHostNode.getConfigurationStore()).getMessageStore();
}
else
{
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
index f52558e298..c87d24f9c6 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java
@@ -50,6 +50,7 @@ import org.apache.qpid.server.security.SecurityManager;
import org.apache.qpid.server.security.access.Operation;
import org.apache.qpid.server.store.DurableConfigurationStore;
import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.MessageStoreProvider;
public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X>
{
@@ -198,9 +199,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<
deleteVirtualHostIfExists();
close();
deleted();
- if (getConfigurationStore() instanceof MessageStore)
+ if (getConfigurationStore() instanceof MessageStoreProvider)
{
- ((MessageStore)getConfigurationStore()).onDelete();
+ ((MessageStoreProvider)getConfigurationStore()).getMessageStore().onDelete();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java
new file mode 100644
index 0000000000..4cb85dafd4
--- /dev/null
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.virtualhostnode;
+
+import org.apache.qpid.server.model.ManagedObject;
+import org.apache.qpid.server.model.VirtualHostNode;
+import org.apache.qpid.server.store.MessageStore;
+
+@ManagedObject( category = false )
+public interface MessageStoreProvidingVirtualHostNode<X extends MessageStoreProvidingVirtualHostNode<X>>
+ extends VirtualHostNode<X>
+{
+ MessageStore getProvidedMessageStore();
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
new file mode 100644
index 0000000000..8fd3cbb1fe
--- /dev/null
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java
@@ -0,0 +1,47 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+import java.util.Collections;
+import java.util.Map;
+
+public class MemoryMessageStoreTest extends MessageStoreTestCase
+{
+
+ @Override
+ protected Map<String, Object> getStoreSettings() throws Exception
+ {
+ return Collections.<String, Object>emptyMap();
+ }
+
+ @Override
+ protected MessageStore createMessageStore()
+ {
+ return new MemoryMessageStore();
+ }
+
+ @Override
+ protected void reopenStore() throws Exception
+ {
+ // cannot re-open memory message store as it is not persistent
+ }
+
+}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
index bfa4e1d52e..12b21fa964 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java
@@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.server.store.handler.MessageHandler;
-/** A simple message store that stores the messages in a thread-safe structure in memory. */
-public class TestMemoryMessageStore extends AbstractMemoryMessageStore
+/**
+ * A simple message store that stores the messages in a thread-safe structure in memory.
+ */
+public class TestMemoryMessageStore extends MemoryMessageStore
{
public static final String TYPE = "TestMemory";
@@ -34,15 +36,14 @@ public class TestMemoryMessageStore extends AbstractMemoryMessageStore
{
final AtomicInteger counter = new AtomicInteger();
visitMessages(new MessageHandler()
- {
-
- @Override
- public boolean handle(StoredMessage<?> storedMessage)
- {
- counter.incrementAndGet();
- return true;
- }
- });
+ {
+ @Override
+ public boolean handle(StoredMessage<?> storedMessage)
+ {
+ counter.incrementAndGet();
+ return true;
+ }
+ });
return counter.get();
}
diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
index d7c11ea226..df17884495 100644
--- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
+++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java
@@ -49,7 +49,9 @@ public class TestMemoryMessageStoreFactory implements MessageStoreFactory, Durab
@Override
public DurableConfigurationStore createDurableConfigurationStore()
{
- return new TestMemoryMessageStore();
+ return new AbstractMemoryStore()
+ {
+ };
}
@Override