diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 10:41:14 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 10:41:14 +0000 |
| commit | b877fef68695499fc63c1d0aef19cd1415981052 (patch) | |
| tree | d3cf766e3c9641c3cc0e1841d95e9686b2ce81ed /qpid/java/bdbstore/src | |
| parent | b71bbd227dfacaedaba411e908853b05e8fbd243 (diff) | |
| download | qpid-python-b877fef68695499fc63c1d0aef19cd1415981052.tar.gz | |
QPID-5930 : [Java Broker] Minimize memory footprint for persistent messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613950 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/bdbstore/src')
6 files changed, 329 insertions, 81 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index be592a0d42..7e5f5bbb3f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -558,6 +558,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + byte[] getAllContent(long messageId) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message Id: " + messageId + " Getting content body"); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + return contentTupleBinding.entryToObject(value); + } + else + { + throw new StoreException("Unable to find message with id " + messageId); + } + + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) { Cursor cursor = null; @@ -810,12 +847,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recordXid(Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + private List<Runnable> recordXid(Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -826,10 +863,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); + List<Runnable> postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredBDBMessage) + { + postActions.add(((StoredBDBMessage) storedMessage).store(txn)); + } + } try { getXidDb().put(txn, key, value); + return postActions; } catch (DatabaseException e) { @@ -1041,17 +1088,127 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + static interface MessageDataRef<T extends StorableMessageMetaData> { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; + private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T> + { + private final T _metaData; + private byte[] _data; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData<T extends StorableMessageMetaData> + { private T _metaData; - private volatile SoftReference<T> _metaDataRef; + private SoftReference<byte[]> _data; - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T> + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData<T>(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData<T> ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData<T> ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData<T> ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + + private volatile MessageDataRef<T> _messageDataRef; StoredBDBMessage(long messageId, T metaData) { @@ -1061,27 +1218,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore StoredBDBMessage(long messageId, T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference<T>(metaData); } @Override public T getMetaData() { - T metaData = _metaDataRef.get(); + T metaData = _messageDataRef.getMetaData(); + if(metaData == null) { checkMessageStoreOpen(); - metaData = (T) getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<T>(metaData); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } - return metaData; } @@ -1095,21 +1253,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); - - if(_data == null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + + System.arraycopy(oldData, 0, data, 0, oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData, 0, _data, 0, oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1117,55 +1277,116 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } - else - { - checkMessageStoreOpen(); - return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else + byte[] data = _messageDataRef.getData(); + if(data == null) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + return ByteBuffer.wrap(data,offsetInMessage,size); + } - synchronized void store(Transaction txn) + synchronized Runnable store(Transaction txn) { if (!stored()) { - try + + AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData()); + AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + MessageDataRef<T> hardRef = _messageDataRef; + MessageDataSoftRef<T> messageDataSoftRef; + MessageData<T> ref; + do { - _dataRef = new SoftReference<byte[]>(_data); - AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } - finally + while (ref == null); + + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - _metaData = null; - _data = null; + private MessageData<T> _ref; + + Pointer(final MessageData<T> ref) + { + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } @@ -1205,7 +1426,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } @Override @@ -1220,7 +1441,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; - private final List<Runnable> _onCommitActions = new ArrayList<>(); + private final List<Runnable> _preCommitActions = new ArrayList<>(); + private final List<Runnable> _postCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1242,13 +1464,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - _onCommitActions.add(new Runnable() + final long contentSize = storedMessage.getMetaData().getContentSize(); + _preCommitActions.add(new Runnable() { @Override public void run() { - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _postCommitActions.add(storedMessage.store(_txn)); + _storeSizeIncrease += contentSize; } }); @@ -1271,16 +1494,26 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + doPostCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override @@ -1289,14 +1522,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + doPostCommitActions(); + return storeFuture; } @Override public void abortTran() throws StoreException { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); + _postCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } @@ -1314,7 +1550,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index ab0d1ab9ef..4c3b72a29b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import com.sleepycat.je.config.ConfigParam; import com.sleepycat.je.config.EnvironmentParams; @@ -65,16 +66,15 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor private Map<String, String> buildEnvironmentConfiguration(ConfiguredObject<?> parent) { - final Map<String, String> context = parent.getContext(); Map<String, String> envConfigMap = new HashMap<>(); for (ConfigParam cp : EnvironmentParams.SUPPORTED_PARAMS.values()) { final String parameterName = cp.getName(); - if (context.containsKey(parameterName) && !cp.isForReplication()) + Set<String> contextKeys = parent.getContextKeys(); + if (!cp.isForReplication() && contextKeys.contains(parameterName)) { - String contextValue = context.get(parameterName); - envConfigMap.put(parameterName, contextValue); + envConfigMap.put(parameterName, parent.getContextValue(String.class, parameterName)); } } return Collections.unmodifiableMap(envConfigMap); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 4f21baf42f..37d53319b5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -126,11 +126,11 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact private Map<String, String> buildConfig(ConfiguredObject<?> parent, Pattern paramName) { Map<String, String> targetMap = new HashMap<>(); - for (String name : parent.getContext().keySet()) + for (String name : parent.getContextKeys()) { if (paramName.matcher(name).matches()) { - String contextValue = parent.getContext().get(name); + String contextValue = parent.getContextValue(String.class,name); targetMap.put(name, contextValue); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java index cc0032845f..12511ad9e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualhost.berkeleydb; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.SizeMonitoringSettings; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -31,6 +32,12 @@ public interface BDBVirtualHost<X extends BDBVirtualHost<X>> extends VirtualHost String STORE_PATH = "storePath"; + // Default the JE cache to 5% of total memory, but no less than 10Mb and no more than 200Mb + @ManagedContextDefault(name="je.maxMemory") + long DEFAULT_JE_CACHE_SIZE = Math.max(10l*1024l*1024l, + Math.min(200l*1024l*1024l, + Runtime.getRuntime().maxMemory()/20l)); + @ManagedAttribute(mandatory = true) String getStorePath(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 388e2e7608..061fa12768 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -418,7 +418,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu LOGGER.debug("Creating new virtualhost with name : " + getGroupName()); } - boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); + boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) && Boolean.parseBoolean(String.valueOf(getContext().get( VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 847f857491..1c5b705fa0 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; +import java.util.Map; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { /* @@ -60,7 +62,10 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB protected VirtualHost createVirtualHost(String storeLocation) { final BDBVirtualHost parent = mock(BDBVirtualHost.class); - when(parent.getContext()).thenReturn(Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE)); + Map<String, String> contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); + when(parent.getContext()).thenReturn(contextMap); + when(parent.getContextKeys()).thenReturn(contextMap.keySet()); + when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE); when(parent.getStorePath()).thenReturn(storeLocation); when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE); when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE); |
