diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 10:41:14 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2014-07-28 10:41:14 +0000 |
| commit | b877fef68695499fc63c1d0aef19cd1415981052 (patch) | |
| tree | d3cf766e3c9641c3cc0e1841d95e9686b2ce81ed /qpid/java/broker-core | |
| parent | b71bbd227dfacaedaba411e908853b05e8fbd243 (diff) | |
| download | qpid-python-b877fef68695499fc63c1d0aef19cd1415981052.tar.gz | |
QPID-5930 : [Java Broker] Minimize memory footprint for persistent messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613950 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
7 files changed, 423 insertions, 110 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 556cfcb266..198c0a1cb9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> { private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; private final Object _connectionReference; @@ -113,7 +113,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI @Override final public MessageReference<X> newReference() { - return new Reference(); + return new Reference(this); } @Override @@ -148,26 +148,32 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI return "Message[" + debugIdentity() + "]"; } - private final class Reference implements MessageReference<X> + private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> + implements MessageReference<X> { - private final AtomicBoolean _released = new AtomicBoolean(false); + private static final AtomicIntegerFieldUpdater<Reference> _releasedUpdater = + AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); + + private AbstractServerMessageImpl<X, T> _message; + private volatile int _released; - private Reference() + private Reference(final AbstractServerMessageImpl<X, T> message) { - incrementReference(); + _message = message; + _message.incrementReference(); } public X getMessage() { - return (X) AbstractServerMessageImpl.this; + return (X) _message; } - public void release() + public synchronized void release() { - if(!_released.getAndSet(true)) + if(_releasedUpdater.compareAndSet(this,0,1)) { - decrementReference(); + _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java index 1d7b8627f4..af9a252077 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java @@ -92,5 +92,7 @@ public interface InstanceProperties return _props.get(prop); } } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 83e2cc06ec..49d3fea8fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -1339,6 +1339,14 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return converter.convert("${" + propertyName + "}", this); } + @Override + public Set<String> getContextKeys() + { + Map<String,String> inheritedContext = new HashMap<>(); + generateInheritedContext(getModel(), this, inheritedContext); + return Collections.unmodifiableSet(inheritedContext.keySet()); + } + private OwnAttributeResolver getOwnAttributeResolver() { return _attributeResolver; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 2301f23773..01ca2fa646 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.security.AccessControlException; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -78,6 +79,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> <T> T getContextValue(Class<T> clazz, String propertyName); + Set<String> getContextKeys(); + @DerivedAttribute( persist = true ) String getLastUpdatedBy(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6fa7801608..e6cde6c934 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.EnumMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -80,13 +79,17 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile long _entryId; - private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties(); + private static int REDELIVERED_FLAG = 1; + private static int PERSISTENT_FLAG = 2; + private static int MANDATORY_FLAG = 4; + private static int IMMEDIATE_FLAG = 8; + private int _flags; + private long _expiration; /** Number of times this message has been delivered */ - private volatile int _deliveryCount = 0; + private volatile int _deliveryCount = -1; private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater .newUpdater(QueueEntryImpl.class, "_deliveryCount"); - private boolean _deliveredToConsumer; public QueueEntryImpl(QueueEntryList queueEntryList) @@ -117,14 +120,17 @@ public abstract class QueueEntryImpl implements QueueEntry { if(_message != null) { - _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent()); - _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration()); + if(_message.getMessage().isPersistent()) + { + setPersistent(); + } + _expiration = _message.getMessage().getExpiration(); } } public InstanceProperties getInstanceProperties() { - return _instanceProperties; + return new EntryInstanceProperties(); } protected void setEntryId(long entryId) @@ -154,21 +160,17 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return _deliveredToConsumer; + return _deliveryCountUpdater.get(this) != -1; } public boolean expired() { - ServerMessage message = getMessage(); - if(message != null) + long expiration = _expiration; + if (expiration != 0L) { - long expiration = message.getExpiration(); - if (expiration != 0L) - { - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - return (now > expiration); - } + return (now > expiration); } return false; @@ -206,7 +208,7 @@ public abstract class QueueEntryImpl implements QueueEntry final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); if(acquired) { - _deliveredToConsumer = true; + _deliveryCountUpdater.compareAndSet(this,-1,0); } return acquired; } @@ -253,15 +255,6 @@ public abstract class QueueEntryImpl implements QueueEntry } - public void setRedelivered() - { - _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE); - } - - public boolean isRedelivered() - { - return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); - } public QueueConsumer getDeliveredConsumer() { @@ -459,7 +452,7 @@ public abstract class QueueEntryImpl implements QueueEntry public int getDeliveryCount() { - return _deliveryCount; + return _deliveryCount == -1 ? 0 : _deliveryCount; } @Override @@ -470,6 +463,7 @@ public abstract class QueueEntryImpl implements QueueEntry public void incrementDeliveryCount() { + _deliveryCountUpdater.compareAndSet(this,-1,0); _deliveryCountUpdater.incrementAndGet(this); } @@ -509,20 +503,45 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue(); } - private static class EntryInstanceProperties implements InstanceProperties + public void setRedelivered() + { + _flags |= REDELIVERED_FLAG; + } + + private void setPersistent() + { + _flags |= PERSISTENT_FLAG; + } + + public boolean isRedelivered() + { + return (_flags & REDELIVERED_FLAG) != 0; + } + + private class EntryInstanceProperties implements InstanceProperties { - private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class); @Override public Object getProperty(final Property prop) { - return _properties.get(prop); - } + switch(prop) + { - private void setProperty(Property prop, Object value) - { - _properties.put(prop, value); + case REDELIVERED: + return (_flags & REDELIVERED_FLAG) != 0; + case PERSISTENT: + return (_flags & PERSISTENT_FLAG) != 0; + case MANDATORY: + return (_flags & MANDATORY_FLAG) != 0; + case IMMEDIATE: + return (_flags & IMMEDIATE_FLAG) != 0; + case EXPIRATION: + return _expiration; + default: + throw new IllegalArgumentException("Unknown property " + prop); + } } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index f2f85e1387..1a1085339d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -716,8 +716,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } - private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, - Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException + private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, + Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException { Connection conn = connWrapper.getConnection(); @@ -738,6 +738,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore stmt.close(); } + List<Runnable> postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredJDBCMessage) + { + postActions.add(((StoredJDBCMessage) storedMessage).store(conn)); + } + } + + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); try @@ -773,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { stmt.close(); } - + return postActions; } catch (SQLException e) { @@ -1105,6 +1116,47 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } + + private byte[] getAllContent(long messageId) + { + Connection conn = null; + PreparedStatement stmt = null; + + try + { + conn = newAutoCommitConnection(); + + stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + ResultSet rs = stmt.executeQuery(); + + int written = 0; + + if (rs.next()) + { + + byte[] dataAsBytes = getBlobAsBytes(rs, 1); + return dataAsBytes; + } + + throw new StoreException("No such message, id: " + messageId); + + } + catch (SQLException e) + { + throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closeConnection(conn, getLogger()); + } + + + } + + + @Override public boolean isPersistent() { @@ -1116,7 +1168,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - private final List<Runnable> _onCommitActions = new ArrayList<>(); + private final List<Runnable> _preCommitActions = new ArrayList<>(); + private final List<Runnable> _postCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1138,19 +1191,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - _onCommitActions.add(new Runnable() + _preCommitActions.add(new Runnable() { @Override public void run() { try { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _postCommitActions.add(((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection())); _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } catch (SQLException e) { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + throw new StoreException("Exception on enqueuing message into message store" + _messageId, + e); } } }); @@ -1174,6 +1228,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); } @Override @@ -1183,23 +1238,33 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); return storeFuture; } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override public void abortTran() { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1216,56 +1281,171 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues)); } } - private class StoredJDBCMessage implements StoredMessage + + static interface MessageDataRef<T extends StorableMessageMetaData> { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; - private StorableMessageMetaData _metaData; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T> + { + private final T _metaData; private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData<T extends StorableMessageMetaData> + { + private T _metaData; + private SoftReference<byte[]> _data; + + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T> + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData<T>(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData<T> ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData<T> ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData<T> ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + + private volatile MessageDataRef<T> _messageDataRef; - StoredJDBCMessage(long messageId, StorableMessageMetaData metaData) + StoredJDBCMessage(long messageId, T metaData) { this(messageId, metaData, false); } StoredJDBCMessage(long messageId, - StorableMessageMetaData metaData, boolean isRecovered) + T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } @Override - public StorableMessageMetaData getMetaData() + public T getMetaData() { - StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; + T metaData = _messageDataRef.getMetaData(); if(metaData == null) { checkMessageStoreOpen(); try { - metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId); + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } catch (SQLException e) { throw new StoreException(e); } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } return metaData; @@ -1281,21 +1461,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); + byte[] data = _messageDataRef.getData(); - if(_data == null) + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + System.arraycopy(oldData,0,data,0,oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1303,34 +1485,90 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else + byte[] data = _messageDataRef.getData(); + + if(data == null) { - checkMessageStoreOpen(); - return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst); + if(stored()) + { + checkMessageStoreOpen(); + getLogger().debug("GET CONTENT for message id " + _messageId); + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.position(0); - buf.limit(length); - return buf; + byte[] data = _messageDataRef.getData(); + + if(data == null) + { + + if(stored()) + { + checkMessageStoreOpen(); + + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } + } + return ByteBuffer.wrap(data,offsetInMessage,size); + } @Override public void remove() { + getLogger().debug("REMOVE called on message: " + _messageId); checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); @@ -1338,32 +1576,69 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } - private synchronized void store(final Connection conn) throws SQLException + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) { - try + getLogger().debug("STORING message id " + _messageId); + storeMetaData(conn, _messageId, _messageDataRef.getMetaData()); + AbstractJDBCMessageStore.this.addContent(conn, _messageId, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + if(getLogger().isDebugEnabled()) { - storeMetaData(conn, _messageId, _metaData); - AbstractJDBCMessageStore.this.addContent(conn, _messageId, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + getLogger().debug("Storing message " + _messageId + " to store"); } - finally + + MessageDataRef<T> hardRef = _messageDataRef; + MessageDataSoftRef<T> messageDataSoftRef; + MessageData<T> ref; + do { - _metaData = null; - _data = null; + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } + while (ref == null); - if(getLogger().isDebugEnabled()) + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - getLogger().debug("Storing message " + _messageId + " to store"); + private MessageData<T> _ref; + + Pointer(final MessageData<T> ref) + { + getLogger().debug("POST COMMIT for message id " + _messageId); + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index fbe402c4a1..df9ffda3a3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -90,7 +90,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard if (host == null) { - boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); + boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) && Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); |
