summaryrefslogtreecommitdiff
path: root/qpid/java/broker-core
diff options
context:
space:
mode:
authorRobert Godfrey <rgodfrey@apache.org>2014-07-28 10:41:14 +0000
committerRobert Godfrey <rgodfrey@apache.org>2014-07-28 10:41:14 +0000
commitb877fef68695499fc63c1d0aef19cd1415981052 (patch)
treed3cf766e3c9641c3cc0e1841d95e9686b2ce81ed /qpid/java/broker-core
parentb71bbd227dfacaedaba411e908853b05e8fbd243 (diff)
downloadqpid-python-b877fef68695499fc63c1d0aef19cd1415981052.tar.gz
QPID-5930 : [Java Broker] Minimize memory footprint for persistent messages
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1613950 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java/broker-core')
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java32
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java2
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java8
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java3
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java87
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java399
-rw-r--r--qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java2
7 files changed, 423 insertions, 110 deletions
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
index 556cfcb266..198c0a1cb9 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java
@@ -20,20 +20,20 @@
*/
package org.apache.qpid.server.message;
+import java.nio.ByteBuffer;
+import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
+
import org.apache.qpid.server.store.StorableMessageMetaData;
import org.apache.qpid.server.store.StoredMessage;
import org.apache.qpid.server.util.ServerScopedRuntimeException;
-import java.nio.ByteBuffer;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicIntegerFieldUpdater;
-
public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T>
{
private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater =
AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount");
+
private volatile int _referenceCount = 0;
private final StoredMessage<T> _handle;
private final Object _connectionReference;
@@ -113,7 +113,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
@Override
final public MessageReference<X> newReference()
{
- return new Reference();
+ return new Reference(this);
}
@Override
@@ -148,26 +148,32 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI
return "Message[" + debugIdentity() + "]";
}
- private final class Reference implements MessageReference<X>
+ private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData>
+ implements MessageReference<X>
{
- private final AtomicBoolean _released = new AtomicBoolean(false);
+ private static final AtomicIntegerFieldUpdater<Reference> _releasedUpdater =
+ AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released");
+
+ private AbstractServerMessageImpl<X, T> _message;
+ private volatile int _released;
- private Reference()
+ private Reference(final AbstractServerMessageImpl<X, T> message)
{
- incrementReference();
+ _message = message;
+ _message.incrementReference();
}
public X getMessage()
{
- return (X) AbstractServerMessageImpl.this;
+ return (X) _message;
}
- public void release()
+ public synchronized void release()
{
- if(!_released.getAndSet(true))
+ if(_releasedUpdater.compareAndSet(this,0,1))
{
- decrementReference();
+ _message.decrementReference();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
index 1d7b8627f4..af9a252077 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java
@@ -92,5 +92,7 @@ public interface InstanceProperties
return _props.get(prop);
}
}
+
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
index 83e2cc06ec..49d3fea8fd 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java
@@ -1339,6 +1339,14 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im
return converter.convert("${" + propertyName + "}", this);
}
+ @Override
+ public Set<String> getContextKeys()
+ {
+ Map<String,String> inheritedContext = new HashMap<>();
+ generateInheritedContext(getModel(), this, inheritedContext);
+ return Collections.unmodifiableSet(inheritedContext.keySet());
+ }
+
private OwnAttributeResolver getOwnAttributeResolver()
{
return _attributeResolver;
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
index 2301f23773..01ca2fa646 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java
@@ -23,6 +23,7 @@ package org.apache.qpid.server.model;
import java.security.AccessControlException;
import java.util.Collection;
import java.util.Map;
+import java.util.Set;
import java.util.UUID;
import org.apache.qpid.server.configuration.updater.TaskExecutor;
@@ -78,6 +79,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>>
<T> T getContextValue(Class<T> clazz, String propertyName);
+ Set<String> getContextKeys();
+
@DerivedAttribute( persist = true )
String getLastUpdatedBy();
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 6fa7801608..e6cde6c934 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -20,7 +20,6 @@
*/
package org.apache.qpid.server.queue;
-import java.util.EnumMap;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
@@ -80,13 +79,17 @@ public abstract class QueueEntryImpl implements QueueEntry
private volatile long _entryId;
- private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties();
+ private static int REDELIVERED_FLAG = 1;
+ private static int PERSISTENT_FLAG = 2;
+ private static int MANDATORY_FLAG = 4;
+ private static int IMMEDIATE_FLAG = 8;
+ private int _flags;
+ private long _expiration;
/** Number of times this message has been delivered */
- private volatile int _deliveryCount = 0;
+ private volatile int _deliveryCount = -1;
private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater
.newUpdater(QueueEntryImpl.class, "_deliveryCount");
- private boolean _deliveredToConsumer;
public QueueEntryImpl(QueueEntryList queueEntryList)
@@ -117,14 +120,17 @@ public abstract class QueueEntryImpl implements QueueEntry
{
if(_message != null)
{
- _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent());
- _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration());
+ if(_message.getMessage().isPersistent())
+ {
+ setPersistent();
+ }
+ _expiration = _message.getMessage().getExpiration();
}
}
public InstanceProperties getInstanceProperties()
{
- return _instanceProperties;
+ return new EntryInstanceProperties();
}
protected void setEntryId(long entryId)
@@ -154,21 +160,17 @@ public abstract class QueueEntryImpl implements QueueEntry
public boolean getDeliveredToConsumer()
{
- return _deliveredToConsumer;
+ return _deliveryCountUpdater.get(this) != -1;
}
public boolean expired()
{
- ServerMessage message = getMessage();
- if(message != null)
+ long expiration = _expiration;
+ if (expiration != 0L)
{
- long expiration = message.getExpiration();
- if (expiration != 0L)
- {
- long now = System.currentTimeMillis();
+ long now = System.currentTimeMillis();
- return (now > expiration);
- }
+ return (now > expiration);
}
return false;
@@ -206,7 +208,7 @@ public abstract class QueueEntryImpl implements QueueEntry
final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState());
if(acquired)
{
- _deliveredToConsumer = true;
+ _deliveryCountUpdater.compareAndSet(this,-1,0);
}
return acquired;
}
@@ -253,15 +255,6 @@ public abstract class QueueEntryImpl implements QueueEntry
}
- public void setRedelivered()
- {
- _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE);
- }
-
- public boolean isRedelivered()
- {
- return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED));
- }
public QueueConsumer getDeliveredConsumer()
{
@@ -459,7 +452,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public int getDeliveryCount()
{
- return _deliveryCount;
+ return _deliveryCount == -1 ? 0 : _deliveryCount;
}
@Override
@@ -470,6 +463,7 @@ public abstract class QueueEntryImpl implements QueueEntry
public void incrementDeliveryCount()
{
+ _deliveryCountUpdater.compareAndSet(this,-1,0);
_deliveryCountUpdater.incrementAndGet(this);
}
@@ -509,20 +503,45 @@ public abstract class QueueEntryImpl implements QueueEntry
return getQueue();
}
- private static class EntryInstanceProperties implements InstanceProperties
+ public void setRedelivered()
+ {
+ _flags |= REDELIVERED_FLAG;
+ }
+
+ private void setPersistent()
+ {
+ _flags |= PERSISTENT_FLAG;
+ }
+
+ public boolean isRedelivered()
+ {
+ return (_flags & REDELIVERED_FLAG) != 0;
+ }
+
+ private class EntryInstanceProperties implements InstanceProperties
{
- private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class);
@Override
public Object getProperty(final Property prop)
{
- return _properties.get(prop);
- }
+ switch(prop)
+ {
- private void setProperty(Property prop, Object value)
- {
- _properties.put(prop, value);
+ case REDELIVERED:
+ return (_flags & REDELIVERED_FLAG) != 0;
+ case PERSISTENT:
+ return (_flags & PERSISTENT_FLAG) != 0;
+ case MANDATORY:
+ return (_flags & MANDATORY_FLAG) != 0;
+ case IMMEDIATE:
+ return (_flags & IMMEDIATE_FLAG) != 0;
+ case EXPIRATION:
+ return _expiration;
+ default:
+ throw new IllegalArgumentException("Unknown property " + prop);
+ }
}
+
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
index f2f85e1387..1a1085339d 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java
@@ -716,8 +716,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
- private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
- Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
+ private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId,
+ Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException
{
Connection conn = connWrapper.getConnection();
@@ -738,6 +738,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
stmt.close();
}
+ List<Runnable> postActions = new ArrayList<>();
+ for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues)
+ {
+ StoredMessage storedMessage = enqueue.getMessage().getStoredMessage();
+ if(storedMessage instanceof StoredJDBCMessage)
+ {
+ postActions.add(((StoredJDBCMessage) storedMessage).store(conn));
+ }
+ }
+
+
stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS);
try
@@ -773,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
stmt.close();
}
-
+ return postActions;
}
catch (SQLException e)
{
@@ -1105,6 +1116,47 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
}
+
+ private byte[] getAllContent(long messageId)
+ {
+ Connection conn = null;
+ PreparedStatement stmt = null;
+
+ try
+ {
+ conn = newAutoCommitConnection();
+
+ stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT);
+ stmt.setLong(1,messageId);
+ ResultSet rs = stmt.executeQuery();
+
+ int written = 0;
+
+ if (rs.next())
+ {
+
+ byte[] dataAsBytes = getBlobAsBytes(rs, 1);
+ return dataAsBytes;
+ }
+
+ throw new StoreException("No such message, id: " + messageId);
+
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e);
+ }
+ finally
+ {
+ JdbcUtils.closePreparedStatement(stmt, getLogger());
+ JdbcUtils.closeConnection(conn, getLogger());
+ }
+
+
+ }
+
+
+
@Override
public boolean isPersistent()
{
@@ -1116,7 +1168,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
private final ConnectionWrapper _connWrapper;
private int _storeSizeIncrease;
- private final List<Runnable> _onCommitActions = new ArrayList<>();
+ private final List<Runnable> _preCommitActions = new ArrayList<>();
+ private final List<Runnable> _postCommitActions = new ArrayList<>();
protected JDBCTransaction()
{
@@ -1138,19 +1191,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
final StoredMessage storedMessage = message.getStoredMessage();
if(storedMessage instanceof StoredJDBCMessage)
{
- _onCommitActions.add(new Runnable()
+ _preCommitActions.add(new Runnable()
{
@Override
public void run()
{
try
{
- ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection());
+ _postCommitActions.add(((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()));
_storeSizeIncrease += storedMessage.getMetaData().getContentSize();
}
catch (SQLException e)
{
- throw new StoreException("Exception on enqueuing message into message store" + _messageId, e);
+ throw new StoreException("Exception on enqueuing message into message store" + _messageId,
+ e);
}
}
});
@@ -1174,6 +1228,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
doPreCommitActions();
AbstractJDBCMessageStore.this.commitTran(_connWrapper);
storedSizeChange(_storeSizeIncrease);
+ doPostCommitActions();
}
@Override
@@ -1183,23 +1238,33 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
doPreCommitActions();
StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper);
storedSizeChange(_storeSizeIncrease);
+ doPostCommitActions();
return storeFuture;
}
private void doPreCommitActions()
{
- for(Runnable action : _onCommitActions)
+ for(Runnable action : _preCommitActions)
+ {
+ action.run();
+ }
+ _preCommitActions.clear();
+ }
+
+ private void doPostCommitActions()
+ {
+ for(Runnable action : _postCommitActions)
{
action.run();
}
- _onCommitActions.clear();
+ _postCommitActions.clear();
}
@Override
public void abortTran()
{
checkMessageStoreOpen();
- _onCommitActions.clear();
+ _preCommitActions.clear();
AbstractJDBCMessageStore.this.abortTran(_connWrapper);
}
@@ -1216,56 +1281,171 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
{
checkMessageStoreOpen();
- AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues);
+ _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues));
}
}
- private class StoredJDBCMessage implements StoredMessage
+
+ static interface MessageDataRef<T extends StorableMessageMetaData>
{
+ T getMetaData();
+ byte[] getData();
+ void setData(byte[] data);
+ boolean isHardRef();
+ }
- private final long _messageId;
- private final boolean _isRecovered;
- private StorableMessageMetaData _metaData;
- private volatile SoftReference<StorableMessageMetaData> _metaDataRef;
+ private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T>
+ {
+ private final T _metaData;
private byte[] _data;
- private volatile SoftReference<byte[]> _dataRef;
+
+ private MessageDataHardRef(final T metaData)
+ {
+ _metaData = metaData;
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ return _data;
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ _data = data;
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return true;
+ }
+ }
+
+ private static final class MessageData<T extends StorableMessageMetaData>
+ {
+ private T _metaData;
+ private SoftReference<byte[]> _data;
+
+ private MessageData(final T metaData, final byte[] data)
+ {
+ _metaData = metaData;
+
+ if(data != null)
+ {
+ _data = new SoftReference<>(data);
+ }
+ }
+
+ public T getMetaData()
+ {
+ return _metaData;
+ }
+
+ public byte[] getData()
+ {
+ return _data == null ? null : _data.get();
+ }
+
+ public void setData(final byte[] data)
+ {
+ _data = new SoftReference<>(data);
+ }
+
+
+ }
+ private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T>
+ {
+
+ public MessageDataSoftRef(final T metadata, byte[] data)
+ {
+ super(new MessageData<T>(metadata, data));
+ }
+
+ @Override
+ public T getMetaData()
+ {
+ MessageData<T> ref = get();
+ return ref == null ? null : ref.getMetaData();
+ }
+
+ @Override
+ public byte[] getData()
+ {
+ MessageData<T> ref = get();
+
+ return ref == null ? null : ref.getData();
+ }
+
+ @Override
+ public void setData(final byte[] data)
+ {
+ MessageData<T> ref = get();
+ if(ref != null)
+ {
+ ref.setData(data);
+ }
+ }
+
+ @Override
+ public boolean isHardRef()
+ {
+ return false;
+ }
+ }
+
+ private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T>
+ {
+
+ private final long _messageId;
+
+ private volatile MessageDataRef<T> _messageDataRef;
- StoredJDBCMessage(long messageId, StorableMessageMetaData metaData)
+ StoredJDBCMessage(long messageId, T metaData)
{
this(messageId, metaData, false);
}
StoredJDBCMessage(long messageId,
- StorableMessageMetaData metaData, boolean isRecovered)
+ T metaData, boolean isRecovered)
{
_messageId = messageId;
- _isRecovered = isRecovered;
- if(!_isRecovered)
+ if(!isRecovered)
{
- _metaData = metaData;
+ _messageDataRef = new MessageDataHardRef<>(metaData);
+ }
+ else
+ {
+ _messageDataRef = new MessageDataSoftRef<>(metaData, null);
}
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
@Override
- public StorableMessageMetaData getMetaData()
+ public T getMetaData()
{
- StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData;
+ T metaData = _messageDataRef.getMetaData();
if(metaData == null)
{
checkMessageStoreOpen();
try
{
- metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData,null);
}
catch (SQLException e)
{
throw new StoreException(e);
}
- _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData);
}
return metaData;
@@ -1281,21 +1461,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
public void addContent(int offsetInMessage, ByteBuffer src)
{
src = src.slice();
+ byte[] data = _messageDataRef.getData();
- if(_data == null)
+ if(data == null)
{
- _data = new byte[src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
- src.duplicate().get(_data);
+ data = new byte[src.remaining()];
+ src.duplicate().get(data);
+ _messageDataRef.setData(data);
}
else
{
- byte[] oldData = _data;
- _data = new byte[oldData.length + src.remaining()];
- _dataRef = new SoftReference<byte[]>(_data);
+ byte[] oldData = data;
+ data = new byte[oldData.length + src.remaining()];
+
+ System.arraycopy(oldData,0,data,0,oldData.length);
+ src.duplicate().get(data, oldData.length, src.remaining());
- System.arraycopy(oldData,0,_data,0,oldData.length);
- src.duplicate().get(_data, oldData.length, src.remaining());
+ _messageDataRef.setData(data);
}
}
@@ -1303,34 +1485,90 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
@Override
public int getContent(int offsetInMessage, ByteBuffer dst)
{
- byte[] data = _dataRef == null ? null : _dataRef.get();
- if(data != null)
- {
- int length = Math.min(dst.remaining(), data.length - offsetInMessage);
- dst.put(data, offsetInMessage, length);
- return length;
- }
- else
+ byte[] data = _messageDataRef.getData();
+
+ if(data == null)
{
- checkMessageStoreOpen();
- return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst);
+ if(stored())
+ {
+ checkMessageStoreOpen();
+ getLogger().debug("GET CONTENT for message id " + _messageId);
+ data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ try
+ {
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
}
+
+ int length = Math.min(dst.remaining(), data.length - offsetInMessage);
+ dst.put(data, offsetInMessage, length);
+ return length;
+
}
@Override
public ByteBuffer getContent(int offsetInMessage, int size)
{
- ByteBuffer buf = ByteBuffer.allocate(size);
- int length = getContent(offsetInMessage, buf);
- buf.position(0);
- buf.limit(length);
- return buf;
+ byte[] data = _messageDataRef.getData();
+
+ if(data == null)
+ {
+
+ if(stored())
+ {
+ checkMessageStoreOpen();
+
+ data = AbstractJDBCMessageStore.this.getAllContent(_messageId);
+ T metaData = _messageDataRef.getMetaData();
+ if (metaData == null)
+ {
+ try
+ {
+ metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId);
+ _messageDataRef = new MessageDataSoftRef<>(metaData, data);
+ }
+ catch (SQLException e)
+ {
+ throw new StoreException(e);
+ }
+ }
+ else
+ {
+ _messageDataRef.setData(data);
+ }
+ }
+ else
+ {
+ data = new byte[0];
+ }
+ }
+ return ByteBuffer.wrap(data,offsetInMessage,size);
+
}
@Override
public void remove()
{
+ getLogger().debug("REMOVE called on message: " + _messageId);
checkMessageStoreOpen();
int delta = getMetaData().getContentSize();
@@ -1338,32 +1576,69 @@ public abstract class AbstractJDBCMessageStore implements MessageStore
storedSizeChange(-delta);
}
- private synchronized void store(final Connection conn) throws SQLException
+ private synchronized Runnable store(final Connection conn) throws SQLException
{
if (!stored())
{
- try
+ getLogger().debug("STORING message id " + _messageId);
+ storeMetaData(conn, _messageId, _messageDataRef.getMetaData());
+ AbstractJDBCMessageStore.this.addContent(conn, _messageId,
+ _messageDataRef.getData() == null
+ ? ByteBuffer.allocate(0)
+ : ByteBuffer.wrap(_messageDataRef.getData()));
+
+
+ if(getLogger().isDebugEnabled())
{
- storeMetaData(conn, _messageId, _metaData);
- AbstractJDBCMessageStore.this.addContent(conn, _messageId,
- _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data));
+ getLogger().debug("Storing message " + _messageId + " to store");
}
- finally
+
+ MessageDataRef<T> hardRef = _messageDataRef;
+ MessageDataSoftRef<T> messageDataSoftRef;
+ MessageData<T> ref;
+ do
{
- _metaData = null;
- _data = null;
+ messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData());
+ ref = messageDataSoftRef.get();
}
+ while (ref == null);
- if(getLogger().isDebugEnabled())
+ _messageDataRef = messageDataSoftRef;
+
+ class Pointer implements Runnable
{
- getLogger().debug("Storing message " + _messageId + " to store");
+ private MessageData<T> _ref;
+
+ Pointer(final MessageData<T> ref)
+ {
+ getLogger().debug("POST COMMIT for message id " + _messageId);
+ _ref = ref;
+ }
+
+ @Override
+ public void run()
+ {
+ _ref = null;
+ }
}
+ return new Pointer(ref);
+ }
+ else
+ {
+ return new Runnable()
+ {
+
+ @Override
+ public void run()
+ {
+ }
+ };
}
}
private boolean stored()
{
- return _metaData == null || _isRecovered;
+ return !_messageDataRef.isHardRef();
}
}
diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
index fbe402c4a1..df9ffda3a3 100644
--- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
+++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java
@@ -90,7 +90,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard
if (host == null)
{
- boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
+ boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR);
boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)
&& Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR)));