diff options
| author | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2014-06-06 15:43:08 +0000 |
| commit | 39249098b7b374c5e45d7139aa8b9df3aebad385 (patch) | |
| tree | ab13b41b26d2036f5765e3a95b8692fe3903ce54 /qpid/java/broker-core/src | |
| parent | 53fd008b70676ce1382bec414bcd0d86299a4ced (diff) | |
| download | qpid-python-39249098b7b374c5e45d7139aa8b9df3aebad385.tar.gz | |
QPID-5800: [Java Broker} Refactor MessageStore implementations extracting a MessageStoreProvider interface.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1600931 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core/src')
13 files changed, 562 insertions, 414 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index e0c1f77d2b..1db2c89d05 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -46,15 +46,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.log4j.Logger; -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.BrokerModel; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.model.UUIDGenerator; -import org.apache.qpid.server.plugin.MessageMetaDataType; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; import org.codehaus.jackson.JsonGenerationException; import org.codehaus.jackson.JsonGenerator; import org.codehaus.jackson.JsonParseException; @@ -67,7 +58,17 @@ import org.codehaus.jackson.map.ObjectMapper; import org.codehaus.jackson.map.SerializerProvider; import org.codehaus.jackson.map.module.SimpleModule; -abstract public class AbstractJDBCMessageStore implements MessageStore, DurableConfigurationStore +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.BrokerModel; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.model.UUIDGenerator; +import org.apache.qpid.server.plugin.MessageMetaDataType; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; + +abstract public class AbstractJDBCMessageStore implements MessageStoreProvider, DurableConfigurationStore { private static final String DB_VERSION_TABLE_NAME = "QPID_DB_VERSION"; private static final String CONFIGURATION_VERSION_TABLE_NAME = "QPID_CONFIG_VERSION"; @@ -369,7 +370,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) { if (_messageStoreOpen.compareAndSet(false, true)) @@ -952,7 +952,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override public void closeMessageStore() { if (_messageStoreOpen.compareAndSet(true, false)) @@ -978,7 +977,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract void doClose(); - @Override public StoredMessage addMessage(StorableMessageMetaData metaData) { checkMessageStoreOpen(); @@ -1132,7 +1130,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract Connection getConnection() throws SQLException; - @Override public Transaction newTransaction() { checkMessageStoreOpen(); @@ -1665,7 +1662,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } - @Override public boolean isPersistent() { return true; @@ -1975,7 +1971,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void addEventListener(EventListener eventListener, Event... events) { _eventManager.addEventListener(eventListener, events); @@ -2250,7 +2245,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitMessages(MessageHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2301,7 +2295,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2346,7 +2339,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC } } - @Override public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException { checkMessageStoreOpen(); @@ -2447,8 +2439,6 @@ abstract public class AbstractJDBCMessageStore implements MessageStore, DurableC protected abstract void storedSizeChange(int storeSizeIncrease); - - @Override public void onDelete() { // TODO should probably check we are closed diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java index e2c43f5012..e69de29bb2 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryMessageStore.java @@ -1,371 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.server.store; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.atomic.AtomicLong; - -import org.apache.qpid.server.message.EnqueueableMessage; -import org.apache.qpid.server.model.ConfiguredObject; -import org.apache.qpid.server.store.Transaction.Record; -import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; -import org.apache.qpid.server.store.handler.DistributedTransactionHandler; -import org.apache.qpid.server.store.handler.MessageHandler; -import org.apache.qpid.server.store.handler.MessageInstanceHandler; - -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -abstract class AbstractMemoryMessageStore implements MessageStore, DurableConfigurationStore -{ - private final class MemoryMessageStoreTransaction implements Transaction - { - private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>(); - private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>(); - - private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>(); - private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); - - @Override - public StoreFuture commitTranAsync() - { - return StoreFuture.IMMEDIATE_FUTURE; - } - - @Override - public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _localEnqueueMap.put(queue.getId(), messageIds); - } - messageIds.add(message.getMessageNumber()); - } - - @Override - public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) - { - Set<Long> messageIds = _localDequeueMap.get(queue.getId()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _localDequeueMap.put(queue.getId(), messageIds); - } - messageIds.add(message.getMessageNumber()); - } - - @Override - public void commitTran() - { - commitTransactionInternal(this); - _localEnqueueMap.clear(); - _localDequeueMap.clear(); - } - - @Override - public void abortTran() - { - _localEnqueueMap.clear(); - _localDequeueMap.clear(); - } - - @Override - public void removeXid(long format, byte[] globalId, byte[] branchId) - { - _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); - } - - @Override - public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) - { - _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); - } - } - - private final AtomicLong _messageId = new AtomicLong(1); - - private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>(); - - protected ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>(); - - private Object _transactionLock = new Object(); - private Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>(); - private Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>(); - - @SuppressWarnings("unchecked") - @Override - public StoredMessage<StorableMessageMetaData> addMessage(final StorableMessageMetaData metaData) - { - long id = _messageId.getAndIncrement(); - - if(metaData.isPersistent()) - { - return new StoredMemoryMessage(id, metaData) - { - - @Override - public StoreFuture flushToStore() - { - _messages.putIfAbsent(getMessageNumber(), this) ; - return super.flushToStore(); - } - - @Override - public void remove() - { - _messages.remove(getMessageNumber()); - super.remove(); - } - - }; - } - else - { - return new StoredMemoryMessage(id, metaData); - } - } - - private void commitTransactionInternal(MemoryMessageStoreTransaction transaction) - { - synchronized (_transactionLock ) - { - for (Map.Entry<UUID, Set<Long>> loacalEnqueuedEntry : transaction._localEnqueueMap.entrySet()) - { - Set<Long> messageIds = _messageInstances.get(loacalEnqueuedEntry.getKey()); - if (messageIds == null) - { - messageIds = new HashSet<Long>(); - _messageInstances.put(loacalEnqueuedEntry.getKey(), messageIds); - } - messageIds.addAll(loacalEnqueuedEntry.getValue()); - } - - for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet()) - { - Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey()); - if (messageIds != null) - { - messageIds.removeAll(loacalDequeueEntry.getValue()); - if (messageIds.isEmpty()) - { - _messageInstances.remove(loacalDequeueEntry.getKey()); - } - } - } - - for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet()) - { - _distributedTransactions.put(entry.getKey(), entry.getValue()); - } - - for (Xid removed : transaction._localDistributedTransactionsRemoves) - { - _distributedTransactions.remove(removed); - } - - } - - - } - - @Override - public Transaction newTransaction() - { - return new MemoryMessageStoreTransaction(); - } - - @Override - public boolean isPersistent() - { - return false; - } - - @Override - public void addEventListener(EventListener eventListener, Event... events) - { - } - - @Override - public void create(ConfiguredObjectRecord record) - { - if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null) - { - throw new StoreException("Record with id " + record.getId() + " is already present"); - } - } - - @Override - public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) - { - for (ConfiguredObjectRecord record : records) - { - ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record); - if (previousValue == null && !createIfNecessary) - { - throw new StoreException("Record with id " + record.getId() + " does not exist"); - } - } - } - - @Override - public UUID[] remove(final ConfiguredObjectRecord... objects) - { - List<UUID> removed = new ArrayList<UUID>(); - for (ConfiguredObjectRecord record : objects) - { - if (_configuredObjectRecords.remove(record.getId()) != null) - { - removed.add(record.getId()); - } - } - return removed.toArray(new UUID[removed.size()]); - } - - @Override - public void closeConfigurationStore() - { - _configuredObjectRecords.clear(); - } - - @Override - public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) - { - } - - @Override - public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException - { - handler.begin(); - for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) - { - if (!handler.handle(record)) - { - break; - } - } - handler.end(); - } - - @Override - public void openMessageStore(ConfiguredObject<?> parent, Map<String, Object> messageStoreSettings) - { - } - - @Override - public void closeMessageStore() - { - _messages.clear(); - synchronized (_transactionLock) - { - _messageInstances.clear(); - _distributedTransactions.clear(); - } - } - - @Override - public String getStoreLocation() - { - return null; - } - - @Override - public void onDelete() - { - } - - @Override - public void visitMessages(MessageHandler handler) throws StoreException - { - for (StoredMemoryMessage message : _messages.values()) - { - if(!handler.handle(message)) - { - break; - } - } - } - - @Override - public void visitMessageInstances(MessageInstanceHandler handler) throws StoreException - { - synchronized (_transactionLock) - { - for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) - { - UUID resourceId = enqueuedEntry.getKey(); - for (Long messageId : enqueuedEntry.getValue()) - { - if (!handler.handle(resourceId, messageId)) - { - return; - } - } - } - } - } - - @Override - public void visitDistributedTransactions(DistributedTransactionHandler handler) throws StoreException - { - synchronized (_transactionLock) - { - for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) - { - Xid xid = entry.getKey(); - DistributedTransactionRecords records = entry.getValue(); - if (!handler.handle(xid.getFormat(), xid.getGlobalId(), xid.getBranchId(), records.getEnqueues(), records.getDequeues())) - { - break; - } - } - } - } - - private static final class DistributedTransactionRecords - { - private Record[] _enqueues; - private Record[] _dequeues; - - public DistributedTransactionRecords(Record[] enqueues, Record[] dequeues) - { - super(); - _enqueues = enqueues; - _dequeues = dequeues; - } - - public Record[] getEnqueues() - { - return _enqueues; - } - - public Record[] getDequeues() - { - return _dequeues; - } - } -} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java new file mode 100644 index 0000000000..267e1d9cb3 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractMemoryStore.java @@ -0,0 +1,109 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; + +abstract class AbstractMemoryStore implements DurableConfigurationStore, MessageStoreProvider +{ + private final MessageStore _messageStore = new MemoryMessageStore(); + + + private final ConcurrentHashMap<UUID, ConfiguredObjectRecord> _configuredObjectRecords = new ConcurrentHashMap<UUID, ConfiguredObjectRecord>(); + + + + @Override + public void create(ConfiguredObjectRecord record) + { + if (_configuredObjectRecords.putIfAbsent(record.getId(), record) != null) + { + throw new StoreException("Record with id " + record.getId() + " is already present"); + } + } + + @Override + public void update(boolean createIfNecessary, ConfiguredObjectRecord... records) + { + for (ConfiguredObjectRecord record : records) + { + ConfiguredObjectRecord previousValue = _configuredObjectRecords.replace(record.getId(), record); + if (previousValue == null && !createIfNecessary) + { + throw new StoreException("Record with id " + record.getId() + " does not exist"); + } + } + } + + @Override + public UUID[] remove(final ConfiguredObjectRecord... objects) + { + List<UUID> removed = new ArrayList<UUID>(); + for (ConfiguredObjectRecord record : objects) + { + if (_configuredObjectRecords.remove(record.getId()) != null) + { + removed.add(record.getId()); + } + } + return removed.toArray(new UUID[removed.size()]); + } + + @Override + public void closeConfigurationStore() + { + _configuredObjectRecords.clear(); + } + + @Override + public void openConfigurationStore(ConfiguredObject<?> parent, Map<String, Object> storeSettings) + { + } + + @Override + public void visitConfiguredObjectRecords(ConfiguredObjectRecordHandler handler) throws StoreException + { + handler.begin(); + for (ConfiguredObjectRecord record : _configuredObjectRecords.values()) + { + if (!handler.handle(record)) + { + break; + } + } + handler.end(); + } + + @Override + public MessageStore getMessageStore() + { + return _messageStore; + } + + +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java new file mode 100644 index 0000000000..44d640ca86 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java @@ -0,0 +1,308 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.qpid.server.message.EnqueueableMessage; +import org.apache.qpid.server.model.ConfiguredObject; +import org.apache.qpid.server.store.handler.DistributedTransactionHandler; +import org.apache.qpid.server.store.handler.MessageHandler; +import org.apache.qpid.server.store.handler.MessageInstanceHandler; + +/** A simple message store that stores the messages in a thread-safe structure in memory. */ +public class MemoryMessageStore implements MessageStore +{ + private final AtomicLong _messageId = new AtomicLong(1); + + private final ConcurrentHashMap<Long, StoredMemoryMessage> _messages = new ConcurrentHashMap<Long, StoredMemoryMessage>(); + private final Object _transactionLock = new Object(); + private final Map<UUID, Set<Long>> _messageInstances = new HashMap<UUID, Set<Long>>(); + private final Map<Xid, DistributedTransactionRecords> _distributedTransactions = new HashMap<Xid, DistributedTransactionRecords>(); + + + private final class MemoryMessageStoreTransaction implements Transaction + { + private Map<UUID, Set<Long>> _localEnqueueMap = new HashMap<UUID, Set<Long>>(); + private Map<UUID, Set<Long>> _localDequeueMap = new HashMap<UUID, Set<Long>>(); + + private Map<Xid, DistributedTransactionRecords> _localDistributedTransactionsRecords = new HashMap<Xid, DistributedTransactionRecords>(); + private Set<Xid> _localDistributedTransactionsRemoves = new HashSet<Xid>(); + + @Override + public StoreFuture commitTranAsync() + { + return StoreFuture.IMMEDIATE_FUTURE; + } + + @Override + public void enqueueMessage(TransactionLogResource queue, EnqueueableMessage message) + { + Set<Long> messageIds = _localEnqueueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _localEnqueueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); + } + + @Override + public void dequeueMessage(TransactionLogResource queue, EnqueueableMessage message) + { + Set<Long> messageIds = _localDequeueMap.get(queue.getId()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _localDequeueMap.put(queue.getId(), messageIds); + } + messageIds.add(message.getMessageNumber()); + } + + @Override + public void commitTran() + { + commitTransactionInternal(this); + _localEnqueueMap.clear(); + _localDequeueMap.clear(); + } + + @Override + public void abortTran() + { + _localEnqueueMap.clear(); + _localDequeueMap.clear(); + } + + @Override + public void removeXid(long format, byte[] globalId, byte[] branchId) + { + _localDistributedTransactionsRemoves.add(new Xid(format, globalId, branchId)); + } + + @Override + public void recordXid(long format, byte[] globalId, byte[] branchId, Record[] enqueues, Record[] dequeues) + { + _localDistributedTransactionsRecords.put(new Xid(format, globalId, branchId), new DistributedTransactionRecords(enqueues, dequeues)); + } + } + + private static final class DistributedTransactionRecords + { + private Transaction.Record[] _enqueues; + private Transaction.Record[] _dequeues; + + public DistributedTransactionRecords(Transaction.Record[] enqueues, Transaction.Record[] dequeues) + { + super(); + _enqueues = enqueues; + _dequeues = dequeues; + } + + public Transaction.Record[] getEnqueues() + { + return _enqueues; + } + + public Transaction.Record[] getDequeues() + { + return _dequeues; + } + } + + private void commitTransactionInternal(MemoryMessageStoreTransaction transaction) + { + synchronized (_transactionLock ) + { + for (Map.Entry<UUID, Set<Long>> localEnqueuedEntry : transaction._localEnqueueMap.entrySet()) + { + Set<Long> messageIds = _messageInstances.get(localEnqueuedEntry.getKey()); + if (messageIds == null) + { + messageIds = new HashSet<Long>(); + _messageInstances.put(localEnqueuedEntry.getKey(), messageIds); + } + messageIds.addAll(localEnqueuedEntry.getValue()); + } + + for (Map.Entry<UUID, Set<Long>> loacalDequeueEntry : transaction._localDequeueMap.entrySet()) + { + Set<Long> messageIds = _messageInstances.get(loacalDequeueEntry.getKey()); + if (messageIds != null) + { + messageIds.removeAll(loacalDequeueEntry.getValue()); + if (messageIds.isEmpty()) + { + _messageInstances.remove(loacalDequeueEntry.getKey()); + } + } + } + + for (Map.Entry<Xid, DistributedTransactionRecords> entry : transaction._localDistributedTransactionsRecords.entrySet()) + { + _distributedTransactions.put(entry.getKey(), entry.getValue()); + } + + for (Xid removed : transaction._localDistributedTransactionsRemoves) + { + _distributedTransactions.remove(removed); + } + + } + + + } + + + @Override + public void openMessageStore(final ConfiguredObject<?> parent, final Map<String, Object> messageStoreSettings) + { + } + + @Override + public <T extends StorableMessageMetaData> StoredMessage<T> addMessage(final T metaData) + { + long id = _messageId.getAndIncrement(); + + if(metaData.isPersistent()) + { + return new StoredMemoryMessage<T>(id, metaData) + { + + @Override + public StoreFuture flushToStore() + { + _messages.putIfAbsent(getMessageNumber(), this) ; + return super.flushToStore(); + } + + @Override + public void remove() + { + _messages.remove(getMessageNumber()); + super.remove(); + } + + }; + } + else + { + return new StoredMemoryMessage<T>(id, metaData); + } + } + + @Override + public boolean isPersistent() + { + return false; + } + + @Override + public Transaction newTransaction() + { + return new MemoryMessageStoreTransaction(); + } + + @Override + public void closeMessageStore() + { + _messages.clear(); + synchronized (_transactionLock) + { + _messageInstances.clear(); + _distributedTransactions.clear(); + } + } + + @Override + public void addEventListener(final EventListener eventListener, final Event... events) + { + } + + @Override + public String getStoreLocation() + { + return null; + } + + @Override + public void onDelete() + { + } + + @Override + public void visitMessages(final MessageHandler handler) throws StoreException + { + for (StoredMemoryMessage message : _messages.values()) + { + if(!handler.handle(message)) + { + break; + } + } + } + + @Override + public void visitMessageInstances(final MessageInstanceHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry<UUID, Set<Long>> enqueuedEntry : _messageInstances.entrySet()) + { + UUID resourceId = enqueuedEntry.getKey(); + for (Long messageId : enqueuedEntry.getValue()) + { + if (!handler.handle(resourceId, messageId)) + { + return; + } + } + } + } + } + + @Override + public void visitDistributedTransactions(final DistributedTransactionHandler handler) throws StoreException + { + synchronized (_transactionLock) + { + for (Map.Entry<Xid, DistributedTransactionRecords> entry : _distributedTransactions.entrySet()) + { + Xid xid = entry.getKey(); + DistributedTransactionRecords records = entry.getValue(); + if (!handler.handle(xid.getFormat(), + xid.getGlobalId(), + xid.getBranchId(), + records.getEnqueues(), + records.getDequeues())) + { + break; + } + } + } + } +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java new file mode 100644 index 0000000000..94d58013d2 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/MessageStoreProvider.java @@ -0,0 +1,26 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +public interface MessageStoreProvider +{ + MessageStore getMessageStore(); +} diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java index e7302270bb..f4e1376980 100755 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/StoredMemoryMessage.java @@ -23,13 +23,13 @@ package org.apache.qpid.server.store; import java.nio.ByteBuffer; -public class StoredMemoryMessage implements StoredMessage +public class StoredMemoryMessage<T extends StorableMessageMetaData> implements StoredMessage<T> { private final long _messageNumber; private ByteBuffer _content; - private final StorableMessageMetaData _metaData; + private final T _metaData; - public StoredMemoryMessage(long messageNumber, StorableMessageMetaData metaData) + public StoredMemoryMessage(long messageNumber, T metaData) { _messageNumber = messageNumber; _metaData = metaData; @@ -128,7 +128,7 @@ public class StoredMemoryMessage implements StoredMessage } - public StorableMessageMetaData getMetaData() + public T getMetaData() { return _metaData; } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java index d27cd1c13e..9e10d5e424 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/AbstractVirtualHost.java @@ -77,6 +77,7 @@ import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.Event; import org.apache.qpid.server.store.EventListener; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; import org.apache.qpid.server.store.StoreException; import org.apache.qpid.server.store.handler.ConfiguredObjectRecordHandler; import org.apache.qpid.server.txn.DtxRegistry; @@ -196,13 +197,14 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte DurableConfigurationStore durableConfigurationStore = _virtualHostNode.getConfigurationStore(); + // TODO attribute messageStoreProvider is to be removed boolean nodeIsMessageStoreProvider = _virtualHostNode.isMessageStoreProvider(); if (nodeIsMessageStoreProvider) { - if (!(durableConfigurationStore instanceof MessageStore)) + if (!(durableConfigurationStore instanceof MessageStoreProvider)) { throw new IllegalConfigurationException("Virtual host node " + _virtualHostNode.getName() - + " is configured as a provider of message store but the MessageStore interface is not implemented on a configuration store of type " + + " is configured as a provider of message store but the MessageStoreProvider interface is not implemented on a configuration store of type " + durableConfigurationStore.getClass().getName()); } } @@ -215,7 +217,7 @@ public abstract class AbstractVirtualHost<X extends AbstractVirtualHost<X>> exte + ". You can either configure the message store setting on the host or " + (durableConfigurationStore instanceof MessageStore ? " configure VirtualHostNode " + _virtualHostNode.getName() + " as a provider of message store" : - " change the node type to one having configuration store implementing the MessageStore inteface") ); + " change the node type to one having configuration store implementing the MessageStore interface") ); } String storeType = (String) messageStoreSettings.get(MessageStore.STORE_TYPE); if (storeType == null) diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java index 62e545659b..b9356b368b 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhost/StandardVirtualHost.java @@ -29,6 +29,7 @@ import org.apache.qpid.server.model.ManagedObjectFactoryConstructor; import org.apache.qpid.server.model.VirtualHostNode; import org.apache.qpid.server.plugin.MessageStoreFactory; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; @ManagedObject( category = false, type = "STANDARD") public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost> @@ -96,7 +97,7 @@ public class StandardVirtualHost extends AbstractVirtualHost<StandardVirtualHost VirtualHostNode<?> virtualHostNode = getParent(VirtualHostNode.class); if (virtualHostNode.isMessageStoreProvider()) { - _messageStore = (MessageStore)virtualHostNode.getConfigurationStore(); + _messageStore = ((MessageStoreProvider)virtualHostNode.getConfigurationStore()).getMessageStore(); } else { diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java index f52558e298..c87d24f9c6 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractVirtualHostNode.java @@ -50,6 +50,7 @@ import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.security.access.Operation; import org.apache.qpid.server.store.DurableConfigurationStore; import org.apache.qpid.server.store.MessageStore; +import org.apache.qpid.server.store.MessageStoreProvider; public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode<X>> extends AbstractConfiguredObject<X> implements VirtualHostNode<X> { @@ -198,9 +199,9 @@ public abstract class AbstractVirtualHostNode<X extends AbstractVirtualHostNode< deleteVirtualHostIfExists(); close(); deleted(); - if (getConfigurationStore() instanceof MessageStore) + if (getConfigurationStore() instanceof MessageStoreProvider) { - ((MessageStore)getConfigurationStore()).onDelete(); + ((MessageStoreProvider)getConfigurationStore()).getMessageStore().onDelete(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java new file mode 100644 index 0000000000..4cb85dafd4 --- /dev/null +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/MessageStoreProvidingVirtualHostNode.java @@ -0,0 +1,32 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.virtualhostnode; + +import org.apache.qpid.server.model.ManagedObject; +import org.apache.qpid.server.model.VirtualHostNode; +import org.apache.qpid.server.store.MessageStore; + +@ManagedObject( category = false ) +public interface MessageStoreProvidingVirtualHostNode<X extends MessageStoreProvidingVirtualHostNode<X>> + extends VirtualHostNode<X> +{ + MessageStore getProvidedMessageStore(); +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java new file mode 100644 index 0000000000..8fd3cbb1fe --- /dev/null +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/MemoryMessageStoreTest.java @@ -0,0 +1,47 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.store; + +import java.util.Collections; +import java.util.Map; + +public class MemoryMessageStoreTest extends MessageStoreTestCase +{ + + @Override + protected Map<String, Object> getStoreSettings() throws Exception + { + return Collections.<String, Object>emptyMap(); + } + + @Override + protected MessageStore createMessageStore() + { + return new MemoryMessageStore(); + } + + @Override + protected void reopenStore() throws Exception + { + // cannot re-open memory message store as it is not persistent + } + +} diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java index bfa4e1d52e..12b21fa964 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStore.java @@ -25,8 +25,10 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.qpid.server.store.handler.MessageHandler; -/** A simple message store that stores the messages in a thread-safe structure in memory. */ -public class TestMemoryMessageStore extends AbstractMemoryMessageStore +/** + * A simple message store that stores the messages in a thread-safe structure in memory. + */ +public class TestMemoryMessageStore extends MemoryMessageStore { public static final String TYPE = "TestMemory"; @@ -34,15 +36,14 @@ public class TestMemoryMessageStore extends AbstractMemoryMessageStore { final AtomicInteger counter = new AtomicInteger(); visitMessages(new MessageHandler() - { - - @Override - public boolean handle(StoredMessage<?> storedMessage) - { - counter.incrementAndGet(); - return true; - } - }); + { + @Override + public boolean handle(StoredMessage<?> storedMessage) + { + counter.incrementAndGet(); + return true; + } + }); return counter.get(); } diff --git a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java index d7c11ea226..df17884495 100644 --- a/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java +++ b/qpid/java/broker-core/src/test/java/org/apache/qpid/server/store/TestMemoryMessageStoreFactory.java @@ -49,7 +49,9 @@ public class TestMemoryMessageStoreFactory implements MessageStoreFactory, Durab @Override public DurableConfigurationStore createDurableConfigurationStore() { - return new TestMemoryMessageStore(); + return new AbstractMemoryStore() + { + }; } @Override |
