diff options
Diffstat (limited to 'qpid/java')
19 files changed, 852 insertions, 220 deletions
diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java index be592a0d42..7e5f5bbb3f 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/AbstractBDBMessageStore.java @@ -558,6 +558,43 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } + byte[] getAllContent(long messageId) throws StoreException + { + DatabaseEntry contentKeyEntry = new DatabaseEntry(); + LongBinding.longToEntry(messageId, contentKeyEntry); + DatabaseEntry value = new DatabaseEntry(); + ContentBinding contentTupleBinding = ContentBinding.getInstance(); + + + if (getLogger().isDebugEnabled()) + { + getLogger().debug("Message Id: " + messageId + " Getting content body"); + } + + try + { + + int written = 0; + OperationStatus status = getMessageContentDb().get(null, contentKeyEntry, value, LockMode.READ_UNCOMMITTED); + if (status == OperationStatus.SUCCESS) + { + return contentTupleBinding.entryToObject(value); + } + else + { + throw new StoreException("Unable to find message with id " + messageId); + } + + } + catch (DatabaseException e) + { + throw getEnvironmentFacade().handleDatabaseException("Error getting AMQMessage with id " + + messageId + + " to database: " + + e.getMessage(), e); + } + } + private void visitMessagesInternal(MessageHandler handler, EnvironmentFacade environmentFacade) { Cursor cursor = null; @@ -810,12 +847,12 @@ public abstract class AbstractBDBMessageStore implements MessageStore } } - private void recordXid(Transaction txn, - long format, - byte[] globalId, - byte[] branchId, - org.apache.qpid.server.store.Transaction.Record[] enqueues, - org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException + private List<Runnable> recordXid(Transaction txn, + long format, + byte[] globalId, + byte[] branchId, + org.apache.qpid.server.store.Transaction.Record[] enqueues, + org.apache.qpid.server.store.Transaction.Record[] dequeues) throws StoreException { DatabaseEntry key = new DatabaseEntry(); Xid xid = new Xid(format, globalId, branchId); @@ -826,10 +863,20 @@ public abstract class AbstractBDBMessageStore implements MessageStore PreparedTransaction preparedTransaction = new PreparedTransaction(enqueues, dequeues); PreparedTransactionBinding valueBinding = new PreparedTransactionBinding(); valueBinding.objectToEntry(preparedTransaction, value); + List<Runnable> postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredBDBMessage) + { + postActions.add(((StoredBDBMessage) storedMessage).store(txn)); + } + } try { getXidDb().put(txn, key, value); + return postActions; } catch (DatabaseException e) { @@ -1041,17 +1088,127 @@ public abstract class AbstractBDBMessageStore implements MessageStore protected abstract Logger getLogger(); - class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + static interface MessageDataRef<T extends StorableMessageMetaData> { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; + private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T> + { + private final T _metaData; + private byte[] _data; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData<T extends StorableMessageMetaData> + { private T _metaData; - private volatile SoftReference<T> _metaDataRef; + private SoftReference<byte[]> _data; - private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T> + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData<T>(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData<T> ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData<T> ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData<T> ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + final class StoredBDBMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + + private volatile MessageDataRef<T> _messageDataRef; StoredBDBMessage(long messageId, T metaData) { @@ -1061,27 +1218,28 @@ public abstract class AbstractBDBMessageStore implements MessageStore StoredBDBMessage(long messageId, T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference<T>(metaData); } @Override public T getMetaData() { - T metaData = _metaDataRef.get(); + T metaData = _messageDataRef.getMetaData(); + if(metaData == null) { checkMessageStoreOpen(); - metaData = (T) getMessageMetaData(_messageId); - _metaDataRef = new SoftReference<T>(metaData); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } - return metaData; } @@ -1095,21 +1253,23 @@ public abstract class AbstractBDBMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); - - if(_data == null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + + System.arraycopy(oldData, 0, data, 0, oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData, 0, _data, 0, oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1117,55 +1277,116 @@ public abstract class AbstractBDBMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) + byte[] data = _messageDataRef.getData(); + if(data == null) { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } - else - { - checkMessageStoreOpen(); - return AbstractBDBMessageStore.this.getContent(_messageId, offsetInMessage, dst); - } + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - return ByteBuffer.wrap(data,offsetInMessage,size); - } - else + byte[] data = _messageDataRef.getData(); + if(data == null) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.limit(length); - buf.position(0); - return buf; + if(stored()) + { + checkMessageStoreOpen(); + data = AbstractBDBMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + metaData = (T) getMessageMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + return ByteBuffer.wrap(data,offsetInMessage,size); + } - synchronized void store(Transaction txn) + synchronized Runnable store(Transaction txn) { if (!stored()) { - try + + AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _messageDataRef.getMetaData()); + AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + MessageDataRef<T> hardRef = _messageDataRef; + MessageDataSoftRef<T> messageDataSoftRef; + MessageData<T> ref; + do { - _dataRef = new SoftReference<byte[]>(_data); - AbstractBDBMessageStore.this.storeMetaData(txn, _messageId, _metaData); - AbstractBDBMessageStore.this.addContent(txn, _messageId, 0, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } - finally + while (ref == null); + + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - _metaData = null; - _data = null; + private MessageData<T> _ref; + + Pointer(final MessageData<T> ref) + { + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } @@ -1205,7 +1426,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } @Override @@ -1220,7 +1441,8 @@ public abstract class AbstractBDBMessageStore implements MessageStore { private Transaction _txn; private int _storeSizeIncrease; - private final List<Runnable> _onCommitActions = new ArrayList<>(); + private final List<Runnable> _preCommitActions = new ArrayList<>(); + private final List<Runnable> _postCommitActions = new ArrayList<>(); private BDBTransaction() throws StoreException { @@ -1242,13 +1464,14 @@ public abstract class AbstractBDBMessageStore implements MessageStore if(message.getStoredMessage() instanceof StoredBDBMessage) { final StoredBDBMessage storedMessage = (StoredBDBMessage) message.getStoredMessage(); - _onCommitActions.add(new Runnable() + final long contentSize = storedMessage.getMetaData().getContentSize(); + _preCommitActions.add(new Runnable() { @Override public void run() { - storedMessage.store(_txn); - _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); + _postCommitActions.add(storedMessage.store(_txn)); + _storeSizeIncrease += contentSize; } }); @@ -1271,16 +1494,26 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.commitTranImpl(_txn, true); + doPostCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override @@ -1289,14 +1522,17 @@ public abstract class AbstractBDBMessageStore implements MessageStore checkMessageStoreOpen(); doPreCommitActions(); AbstractBDBMessageStore.this.storedSizeChangeOccurred(_storeSizeIncrease); - return AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + StoreFuture storeFuture = AbstractBDBMessageStore.this.commitTranImpl(_txn, false); + doPostCommitActions(); + return storeFuture; } @Override public void abortTran() throws StoreException { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); + _postCommitActions.clear(); AbstractBDBMessageStore.this.abortTran(_txn); } @@ -1314,7 +1550,7 @@ public abstract class AbstractBDBMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractBDBMessageStore.this.recordXid(_txn, format, globalId, branchId, enqueues, dequeues)); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java index ab0d1ab9ef..4c3b72a29b 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/StandardEnvironmentFacadeFactory.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.store.berkeleydb; import java.util.Collections; import java.util.HashMap; import java.util.Map; +import java.util.Set; import com.sleepycat.je.config.ConfigParam; import com.sleepycat.je.config.EnvironmentParams; @@ -65,16 +66,15 @@ public class StandardEnvironmentFacadeFactory implements EnvironmentFacadeFactor private Map<String, String> buildEnvironmentConfiguration(ConfiguredObject<?> parent) { - final Map<String, String> context = parent.getContext(); Map<String, String> envConfigMap = new HashMap<>(); for (ConfigParam cp : EnvironmentParams.SUPPORTED_PARAMS.values()) { final String parameterName = cp.getName(); - if (context.containsKey(parameterName) && !cp.isForReplication()) + Set<String> contextKeys = parent.getContextKeys(); + if (!cp.isForReplication() && contextKeys.contains(parameterName)) { - String contextValue = context.get(parameterName); - envConfigMap.put(parameterName, contextValue); + envConfigMap.put(parameterName, parent.getContextValue(String.class, parameterName)); } } return Collections.unmodifiableMap(envConfigMap); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java index 4f21baf42f..37d53319b5 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/replication/ReplicatedEnvironmentFacadeFactory.java @@ -126,11 +126,11 @@ public class ReplicatedEnvironmentFacadeFactory implements EnvironmentFacadeFact private Map<String, String> buildConfig(ConfiguredObject<?> parent, Pattern paramName) { Map<String, String> targetMap = new HashMap<>(); - for (String name : parent.getContext().keySet()) + for (String name : parent.getContextKeys()) { if (paramName.matcher(name).matches()) { - String contextValue = parent.getContext().get(name); + String contextValue = parent.getContextValue(String.class,name); targetMap.put(name, contextValue); } } diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java index cc0032845f..12511ad9e0 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhost/berkeleydb/BDBVirtualHost.java @@ -22,6 +22,7 @@ package org.apache.qpid.server.virtualhost.berkeleydb; import org.apache.qpid.server.exchange.ExchangeImpl; import org.apache.qpid.server.model.ManagedAttribute; +import org.apache.qpid.server.model.ManagedContextDefault; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.store.SizeMonitoringSettings; import org.apache.qpid.server.virtualhost.VirtualHostImpl; @@ -31,6 +32,12 @@ public interface BDBVirtualHost<X extends BDBVirtualHost<X>> extends VirtualHost String STORE_PATH = "storePath"; + // Default the JE cache to 5% of total memory, but no less than 10Mb and no more than 200Mb + @ManagedContextDefault(name="je.maxMemory") + long DEFAULT_JE_CACHE_SIZE = Math.max(10l*1024l*1024l, + Math.min(200l*1024l*1024l, + Runtime.getRuntime().maxMemory()/20l)); + @ManagedAttribute(mandatory = true) String getStorePath(); diff --git a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java index 388e2e7608..061fa12768 100644 --- a/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java +++ b/qpid/java/bdbstore/src/main/java/org/apache/qpid/server/virtualhostnode/berkeleydb/BDBHAVirtualHostNodeImpl.java @@ -418,7 +418,7 @@ public class BDBHAVirtualHostNodeImpl extends AbstractVirtualHostNode<BDBHAVirtu LOGGER.debug("Creating new virtualhost with name : " + getGroupName()); } - boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); + boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) && Boolean.parseBoolean(String.valueOf(getContext().get( VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); diff --git a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java index 847f857491..1c5b705fa0 100644 --- a/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java +++ b/qpid/java/bdbstore/src/test/java/org/apache/qpid/server/store/berkeleydb/BDBMessageStoreQuotaEventsTest.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.store.berkeleydb; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; +import java.util.Map; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; import org.apache.qpid.server.virtualhost.berkeleydb.BDBVirtualHost; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { /* @@ -60,7 +62,10 @@ public class BDBMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestB protected VirtualHost createVirtualHost(String storeLocation) { final BDBVirtualHost parent = mock(BDBVirtualHost.class); - when(parent.getContext()).thenReturn(Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE)); + Map<String, String> contextMap = Collections.singletonMap("je.log.fileMax", MAX_BDB_LOG_SIZE); + when(parent.getContext()).thenReturn(contextMap); + when(parent.getContextKeys()).thenReturn(contextMap.keySet()); + when(parent.getContextValue(eq(String.class),eq("je.log.fileMax"))).thenReturn(MAX_BDB_LOG_SIZE); when(parent.getStorePath()).thenReturn(storeLocation); when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE); when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java index 556cfcb266..198c0a1cb9 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/AbstractServerMessageImpl.java @@ -20,20 +20,20 @@ */ package org.apache.qpid.server.message; +import java.nio.ByteBuffer; +import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; + import org.apache.qpid.server.store.StorableMessageMetaData; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ServerScopedRuntimeException; -import java.nio.ByteBuffer; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicIntegerFieldUpdater; - public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> implements ServerMessage<T> { private static final AtomicIntegerFieldUpdater<AbstractServerMessageImpl> _refCountUpdater = AtomicIntegerFieldUpdater.newUpdater(AbstractServerMessageImpl.class, "_referenceCount"); + private volatile int _referenceCount = 0; private final StoredMessage<T> _handle; private final Object _connectionReference; @@ -113,7 +113,7 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI @Override final public MessageReference<X> newReference() { - return new Reference(); + return new Reference(this); } @Override @@ -148,26 +148,32 @@ public abstract class AbstractServerMessageImpl<X extends AbstractServerMessageI return "Message[" + debugIdentity() + "]"; } - private final class Reference implements MessageReference<X> + private static class Reference<X extends AbstractServerMessageImpl<X,T>, T extends StorableMessageMetaData> + implements MessageReference<X> { - private final AtomicBoolean _released = new AtomicBoolean(false); + private static final AtomicIntegerFieldUpdater<Reference> _releasedUpdater = + AtomicIntegerFieldUpdater.newUpdater(Reference.class, "_released"); + + private AbstractServerMessageImpl<X, T> _message; + private volatile int _released; - private Reference() + private Reference(final AbstractServerMessageImpl<X, T> message) { - incrementReference(); + _message = message; + _message.incrementReference(); } public X getMessage() { - return (X) AbstractServerMessageImpl.this; + return (X) _message; } - public void release() + public synchronized void release() { - if(!_released.getAndSet(true)) + if(_releasedUpdater.compareAndSet(this,0,1)) { - decrementReference(); + _message.decrementReference(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java index 1d7b8627f4..af9a252077 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/message/InstanceProperties.java @@ -92,5 +92,7 @@ public interface InstanceProperties return _props.get(prop); } } + + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java index 83e2cc06ec..49d3fea8fd 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/AbstractConfiguredObject.java @@ -1339,6 +1339,14 @@ public abstract class AbstractConfiguredObject<X extends ConfiguredObject<X>> im return converter.convert("${" + propertyName + "}", this); } + @Override + public Set<String> getContextKeys() + { + Map<String,String> inheritedContext = new HashMap<>(); + generateInheritedContext(getModel(), this, inheritedContext); + return Collections.unmodifiableSet(inheritedContext.keySet()); + } + private OwnAttributeResolver getOwnAttributeResolver() { return _attributeResolver; diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java index 2301f23773..01ca2fa646 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/model/ConfiguredObject.java @@ -23,6 +23,7 @@ package org.apache.qpid.server.model; import java.security.AccessControlException; import java.util.Collection; import java.util.Map; +import java.util.Set; import java.util.UUID; import org.apache.qpid.server.configuration.updater.TaskExecutor; @@ -78,6 +79,8 @@ public interface ConfiguredObject<X extends ConfiguredObject<X>> <T> T getContextValue(Class<T> clazz, String propertyName); + Set<String> getContextKeys(); + @DerivedAttribute( persist = true ) String getLastUpdatedBy(); diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 6fa7801608..e6cde6c934 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -20,7 +20,6 @@ */ package org.apache.qpid.server.queue; -import java.util.EnumMap; import java.util.HashSet; import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; @@ -80,13 +79,17 @@ public abstract class QueueEntryImpl implements QueueEntry private volatile long _entryId; - private final EntryInstanceProperties _instanceProperties = new EntryInstanceProperties(); + private static int REDELIVERED_FLAG = 1; + private static int PERSISTENT_FLAG = 2; + private static int MANDATORY_FLAG = 4; + private static int IMMEDIATE_FLAG = 8; + private int _flags; + private long _expiration; /** Number of times this message has been delivered */ - private volatile int _deliveryCount = 0; + private volatile int _deliveryCount = -1; private static final AtomicIntegerFieldUpdater<QueueEntryImpl> _deliveryCountUpdater = AtomicIntegerFieldUpdater .newUpdater(QueueEntryImpl.class, "_deliveryCount"); - private boolean _deliveredToConsumer; public QueueEntryImpl(QueueEntryList queueEntryList) @@ -117,14 +120,17 @@ public abstract class QueueEntryImpl implements QueueEntry { if(_message != null) { - _instanceProperties.setProperty(InstanceProperties.Property.PERSISTENT, _message.getMessage().isPersistent()); - _instanceProperties.setProperty(InstanceProperties.Property.EXPIRATION, _message.getMessage().getExpiration()); + if(_message.getMessage().isPersistent()) + { + setPersistent(); + } + _expiration = _message.getMessage().getExpiration(); } } public InstanceProperties getInstanceProperties() { - return _instanceProperties; + return new EntryInstanceProperties(); } protected void setEntryId(long entryId) @@ -154,21 +160,17 @@ public abstract class QueueEntryImpl implements QueueEntry public boolean getDeliveredToConsumer() { - return _deliveredToConsumer; + return _deliveryCountUpdater.get(this) != -1; } public boolean expired() { - ServerMessage message = getMessage(); - if(message != null) + long expiration = _expiration; + if (expiration != 0L) { - long expiration = message.getExpiration(); - if (expiration != 0L) - { - long now = System.currentTimeMillis(); + long now = System.currentTimeMillis(); - return (now > expiration); - } + return (now > expiration); } return false; @@ -206,7 +208,7 @@ public abstract class QueueEntryImpl implements QueueEntry final boolean acquired = acquire(((QueueConsumer<?>)sub).getOwningState()); if(acquired) { - _deliveredToConsumer = true; + _deliveryCountUpdater.compareAndSet(this,-1,0); } return acquired; } @@ -253,15 +255,6 @@ public abstract class QueueEntryImpl implements QueueEntry } - public void setRedelivered() - { - _instanceProperties.setProperty(InstanceProperties.Property.REDELIVERED, Boolean.TRUE); - } - - public boolean isRedelivered() - { - return Boolean.TRUE.equals(_instanceProperties.getProperty(InstanceProperties.Property.REDELIVERED)); - } public QueueConsumer getDeliveredConsumer() { @@ -459,7 +452,7 @@ public abstract class QueueEntryImpl implements QueueEntry public int getDeliveryCount() { - return _deliveryCount; + return _deliveryCount == -1 ? 0 : _deliveryCount; } @Override @@ -470,6 +463,7 @@ public abstract class QueueEntryImpl implements QueueEntry public void incrementDeliveryCount() { + _deliveryCountUpdater.compareAndSet(this,-1,0); _deliveryCountUpdater.incrementAndGet(this); } @@ -509,20 +503,45 @@ public abstract class QueueEntryImpl implements QueueEntry return getQueue(); } - private static class EntryInstanceProperties implements InstanceProperties + public void setRedelivered() + { + _flags |= REDELIVERED_FLAG; + } + + private void setPersistent() + { + _flags |= PERSISTENT_FLAG; + } + + public boolean isRedelivered() + { + return (_flags & REDELIVERED_FLAG) != 0; + } + + private class EntryInstanceProperties implements InstanceProperties { - private final EnumMap<Property, Object> _properties = new EnumMap<Property, Object>(Property.class); @Override public Object getProperty(final Property prop) { - return _properties.get(prop); - } + switch(prop) + { - private void setProperty(Property prop, Object value) - { - _properties.put(prop, value); + case REDELIVERED: + return (_flags & REDELIVERED_FLAG) != 0; + case PERSISTENT: + return (_flags & PERSISTENT_FLAG) != 0; + case MANDATORY: + return (_flags & MANDATORY_FLAG) != 0; + case IMMEDIATE: + return (_flags & IMMEDIATE_FLAG) != 0; + case EXPIRATION: + return _expiration; + default: + throw new IllegalArgumentException("Unknown property " + prop); + } } + } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java index f2f85e1387..1a1085339d 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/store/AbstractJDBCMessageStore.java @@ -716,8 +716,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } - private void recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, - Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException + private List<Runnable> recordXid(ConnectionWrapper connWrapper, long format, byte[] globalId, byte[] branchId, + Transaction.Record[] enqueues, Transaction.Record[] dequeues) throws StoreException { Connection conn = connWrapper.getConnection(); @@ -738,6 +738,17 @@ public abstract class AbstractJDBCMessageStore implements MessageStore stmt.close(); } + List<Runnable> postActions = new ArrayList<>(); + for(org.apache.qpid.server.store.Transaction.Record enqueue : enqueues) + { + StoredMessage storedMessage = enqueue.getMessage().getStoredMessage(); + if(storedMessage instanceof StoredJDBCMessage) + { + postActions.add(((StoredJDBCMessage) storedMessage).store(conn)); + } + } + + stmt = conn.prepareStatement(INSERT_INTO_XID_ACTIONS); try @@ -773,7 +784,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { stmt.close(); } - + return postActions; } catch (SQLException e) { @@ -1105,6 +1116,47 @@ public abstract class AbstractJDBCMessageStore implements MessageStore } + + private byte[] getAllContent(long messageId) + { + Connection conn = null; + PreparedStatement stmt = null; + + try + { + conn = newAutoCommitConnection(); + + stmt = conn.prepareStatement(SELECT_FROM_MESSAGE_CONTENT); + stmt.setLong(1,messageId); + ResultSet rs = stmt.executeQuery(); + + int written = 0; + + if (rs.next()) + { + + byte[] dataAsBytes = getBlobAsBytes(rs, 1); + return dataAsBytes; + } + + throw new StoreException("No such message, id: " + messageId); + + } + catch (SQLException e) + { + throw new StoreException("Error retrieving content for message " + messageId + ": " + e.getMessage(), e); + } + finally + { + JdbcUtils.closePreparedStatement(stmt, getLogger()); + JdbcUtils.closeConnection(conn, getLogger()); + } + + + } + + + @Override public boolean isPersistent() { @@ -1116,7 +1168,8 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { private final ConnectionWrapper _connWrapper; private int _storeSizeIncrease; - private final List<Runnable> _onCommitActions = new ArrayList<>(); + private final List<Runnable> _preCommitActions = new ArrayList<>(); + private final List<Runnable> _postCommitActions = new ArrayList<>(); protected JDBCTransaction() { @@ -1138,19 +1191,20 @@ public abstract class AbstractJDBCMessageStore implements MessageStore final StoredMessage storedMessage = message.getStoredMessage(); if(storedMessage instanceof StoredJDBCMessage) { - _onCommitActions.add(new Runnable() + _preCommitActions.add(new Runnable() { @Override public void run() { try { - ((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection()); + _postCommitActions.add(((StoredJDBCMessage) storedMessage).store(_connWrapper.getConnection())); _storeSizeIncrease += storedMessage.getMetaData().getContentSize(); } catch (SQLException e) { - throw new StoreException("Exception on enqueuing message into message store" + _messageId, e); + throw new StoreException("Exception on enqueuing message into message store" + _messageId, + e); } } }); @@ -1174,6 +1228,7 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); AbstractJDBCMessageStore.this.commitTran(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); } @Override @@ -1183,23 +1238,33 @@ public abstract class AbstractJDBCMessageStore implements MessageStore doPreCommitActions(); StoreFuture storeFuture = AbstractJDBCMessageStore.this.commitTranAsync(_connWrapper); storedSizeChange(_storeSizeIncrease); + doPostCommitActions(); return storeFuture; } private void doPreCommitActions() { - for(Runnable action : _onCommitActions) + for(Runnable action : _preCommitActions) + { + action.run(); + } + _preCommitActions.clear(); + } + + private void doPostCommitActions() + { + for(Runnable action : _postCommitActions) { action.run(); } - _onCommitActions.clear(); + _postCommitActions.clear(); } @Override public void abortTran() { checkMessageStoreOpen(); - _onCommitActions.clear(); + _preCommitActions.clear(); AbstractJDBCMessageStore.this.abortTran(_connWrapper); } @@ -1216,56 +1281,171 @@ public abstract class AbstractJDBCMessageStore implements MessageStore { checkMessageStoreOpen(); - AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues); + _postCommitActions.addAll(AbstractJDBCMessageStore.this.recordXid(_connWrapper, format, globalId, branchId, enqueues, dequeues)); } } - private class StoredJDBCMessage implements StoredMessage + + static interface MessageDataRef<T extends StorableMessageMetaData> { + T getMetaData(); + byte[] getData(); + void setData(byte[] data); + boolean isHardRef(); + } - private final long _messageId; - private final boolean _isRecovered; - private StorableMessageMetaData _metaData; - private volatile SoftReference<StorableMessageMetaData> _metaDataRef; + private static final class MessageDataHardRef<T extends StorableMessageMetaData> implements MessageDataRef<T> + { + private final T _metaData; private byte[] _data; - private volatile SoftReference<byte[]> _dataRef; + + private MessageDataHardRef(final T metaData) + { + _metaData = metaData; + } + + @Override + public T getMetaData() + { + return _metaData; + } + + @Override + public byte[] getData() + { + return _data; + } + + @Override + public void setData(final byte[] data) + { + _data = data; + } + + @Override + public boolean isHardRef() + { + return true; + } + } + + private static final class MessageData<T extends StorableMessageMetaData> + { + private T _metaData; + private SoftReference<byte[]> _data; + + private MessageData(final T metaData, final byte[] data) + { + _metaData = metaData; + + if(data != null) + { + _data = new SoftReference<>(data); + } + } + + public T getMetaData() + { + return _metaData; + } + + public byte[] getData() + { + return _data == null ? null : _data.get(); + } + + public void setData(final byte[] data) + { + _data = new SoftReference<>(data); + } + + + } + private static final class MessageDataSoftRef<T extends StorableMessageMetaData> extends SoftReference<MessageData<T>> implements MessageDataRef<T> + { + + public MessageDataSoftRef(final T metadata, byte[] data) + { + super(new MessageData<T>(metadata, data)); + } + + @Override + public T getMetaData() + { + MessageData<T> ref = get(); + return ref == null ? null : ref.getMetaData(); + } + + @Override + public byte[] getData() + { + MessageData<T> ref = get(); + + return ref == null ? null : ref.getData(); + } + + @Override + public void setData(final byte[] data) + { + MessageData<T> ref = get(); + if(ref != null) + { + ref.setData(data); + } + } + + @Override + public boolean isHardRef() + { + return false; + } + } + + private class StoredJDBCMessage<T extends StorableMessageMetaData> implements StoredMessage<T> + { + + private final long _messageId; + + private volatile MessageDataRef<T> _messageDataRef; - StoredJDBCMessage(long messageId, StorableMessageMetaData metaData) + StoredJDBCMessage(long messageId, T metaData) { this(messageId, metaData, false); } StoredJDBCMessage(long messageId, - StorableMessageMetaData metaData, boolean isRecovered) + T metaData, boolean isRecovered) { _messageId = messageId; - _isRecovered = isRecovered; - if(!_isRecovered) + if(!isRecovered) { - _metaData = metaData; + _messageDataRef = new MessageDataHardRef<>(metaData); + } + else + { + _messageDataRef = new MessageDataSoftRef<>(metaData, null); } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } @Override - public StorableMessageMetaData getMetaData() + public T getMetaData() { - StorableMessageMetaData metaData = _metaData == null ? _metaDataRef.get() : _metaData; + T metaData = _messageDataRef.getMetaData(); if(metaData == null) { checkMessageStoreOpen(); try { - metaData = AbstractJDBCMessageStore.this.getMetaData(_messageId); + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData,null); } catch (SQLException e) { throw new StoreException(e); } - _metaDataRef = new SoftReference<StorableMessageMetaData>(metaData); } return metaData; @@ -1281,21 +1461,23 @@ public abstract class AbstractJDBCMessageStore implements MessageStore public void addContent(int offsetInMessage, ByteBuffer src) { src = src.slice(); + byte[] data = _messageDataRef.getData(); - if(_data == null) + if(data == null) { - _data = new byte[src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); - src.duplicate().get(_data); + data = new byte[src.remaining()]; + src.duplicate().get(data); + _messageDataRef.setData(data); } else { - byte[] oldData = _data; - _data = new byte[oldData.length + src.remaining()]; - _dataRef = new SoftReference<byte[]>(_data); + byte[] oldData = data; + data = new byte[oldData.length + src.remaining()]; + + System.arraycopy(oldData,0,data,0,oldData.length); + src.duplicate().get(data, oldData.length, src.remaining()); - System.arraycopy(oldData,0,_data,0,oldData.length); - src.duplicate().get(_data, oldData.length, src.remaining()); + _messageDataRef.setData(data); } } @@ -1303,34 +1485,90 @@ public abstract class AbstractJDBCMessageStore implements MessageStore @Override public int getContent(int offsetInMessage, ByteBuffer dst) { - byte[] data = _dataRef == null ? null : _dataRef.get(); - if(data != null) - { - int length = Math.min(dst.remaining(), data.length - offsetInMessage); - dst.put(data, offsetInMessage, length); - return length; - } - else + byte[] data = _messageDataRef.getData(); + + if(data == null) { - checkMessageStoreOpen(); - return AbstractJDBCMessageStore.this.getContent(_messageId, offsetInMessage, dst); + if(stored()) + { + checkMessageStoreOpen(); + getLogger().debug("GET CONTENT for message id " + _messageId); + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } } + + int length = Math.min(dst.remaining(), data.length - offsetInMessage); + dst.put(data, offsetInMessage, length); + return length; + } @Override public ByteBuffer getContent(int offsetInMessage, int size) { - ByteBuffer buf = ByteBuffer.allocate(size); - int length = getContent(offsetInMessage, buf); - buf.position(0); - buf.limit(length); - return buf; + byte[] data = _messageDataRef.getData(); + + if(data == null) + { + + if(stored()) + { + checkMessageStoreOpen(); + + data = AbstractJDBCMessageStore.this.getAllContent(_messageId); + T metaData = _messageDataRef.getMetaData(); + if (metaData == null) + { + try + { + metaData = (T) AbstractJDBCMessageStore.this.getMetaData(_messageId); + _messageDataRef = new MessageDataSoftRef<>(metaData, data); + } + catch (SQLException e) + { + throw new StoreException(e); + } + } + else + { + _messageDataRef.setData(data); + } + } + else + { + data = new byte[0]; + } + } + return ByteBuffer.wrap(data,offsetInMessage,size); + } @Override public void remove() { + getLogger().debug("REMOVE called on message: " + _messageId); checkMessageStoreOpen(); int delta = getMetaData().getContentSize(); @@ -1338,32 +1576,69 @@ public abstract class AbstractJDBCMessageStore implements MessageStore storedSizeChange(-delta); } - private synchronized void store(final Connection conn) throws SQLException + private synchronized Runnable store(final Connection conn) throws SQLException { if (!stored()) { - try + getLogger().debug("STORING message id " + _messageId); + storeMetaData(conn, _messageId, _messageDataRef.getMetaData()); + AbstractJDBCMessageStore.this.addContent(conn, _messageId, + _messageDataRef.getData() == null + ? ByteBuffer.allocate(0) + : ByteBuffer.wrap(_messageDataRef.getData())); + + + if(getLogger().isDebugEnabled()) { - storeMetaData(conn, _messageId, _metaData); - AbstractJDBCMessageStore.this.addContent(conn, _messageId, - _data == null ? ByteBuffer.allocate(0) : ByteBuffer.wrap(_data)); + getLogger().debug("Storing message " + _messageId + " to store"); } - finally + + MessageDataRef<T> hardRef = _messageDataRef; + MessageDataSoftRef<T> messageDataSoftRef; + MessageData<T> ref; + do { - _metaData = null; - _data = null; + messageDataSoftRef = new MessageDataSoftRef<>(hardRef.getMetaData(), hardRef.getData()); + ref = messageDataSoftRef.get(); } + while (ref == null); - if(getLogger().isDebugEnabled()) + _messageDataRef = messageDataSoftRef; + + class Pointer implements Runnable { - getLogger().debug("Storing message " + _messageId + " to store"); + private MessageData<T> _ref; + + Pointer(final MessageData<T> ref) + { + getLogger().debug("POST COMMIT for message id " + _messageId); + _ref = ref; + } + + @Override + public void run() + { + _ref = null; + } } + return new Pointer(ref); + } + else + { + return new Runnable() + { + + @Override + public void run() + { + } + }; } } private boolean stored() { - return _metaData == null || _isRecovered; + return !_messageDataRef.isHardRef(); } } diff --git a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java index fbe402c4a1..df9ffda3a3 100644 --- a/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java +++ b/qpid/java/broker-core/src/main/java/org/apache/qpid/server/virtualhostnode/AbstractStandardVirtualHostNode.java @@ -90,7 +90,7 @@ public abstract class AbstractStandardVirtualHostNode<X extends AbstractStandard if (host == null) { - boolean hasBlueprint = getContext().containsKey(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); + boolean hasBlueprint = getContextKeys().contains(VIRTUALHOST_BLUEPRINT_CONTEXT_VAR); boolean blueprintUtilised = getContext().containsKey(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR) && Boolean.parseBoolean(String.valueOf(getContext().get(VIRTUALHOST_BLUEPRINT_UTILISED_CONTEXT_VAR))); diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java index 4b37823898..ae6d607102 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQChannel.java @@ -1249,8 +1249,6 @@ public class AMQChannel<T extends AMQProtocolSession<T>> final BasicContentHeaderProperties properties = incomingMessage.getContentHeader().getProperties(); - long expiration = properties.getExpiration(); - message.setExpiration(expiration); return message; } diff --git a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java index 0ed63daf7c..869de2f3a5 100644 --- a/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java +++ b/qpid/java/broker-plugins/amqp-0-8-protocol/src/main/java/org/apache/qpid/server/protocol/v0_8/AMQMessage.java @@ -29,8 +29,6 @@ import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; -import java.nio.ByteBuffer; - /** * A deliverable message. */ @@ -39,10 +37,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); - /** Flag to indicate that this message requires 'immediate' delivery. */ - - private long _expiration; - private final long _size; public AMQMessage(StoredMessage<MessageMetaData> handle) @@ -56,11 +50,6 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet _size = handle.getMetaData().getContentSize(); } - public void setExpiration(final long expiration) - { - _expiration = expiration; - } - public MessageMetaData getMessageMetaData() { return getStoredMessage().getMetaData(); @@ -110,16 +99,14 @@ public class AMQMessage extends AbstractServerMessageImpl<AMQMessage, MessageMet return getMessagePublishInfo().isImmediate(); } - public boolean isMandatory() { return getMessagePublishInfo().isMandatory(); } - public long getExpiration() { - return _expiration; + return getMessageHeader().getExpiration(); } diff --git a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java index 5c8eb94d91..e39a35f9a4 100644 --- a/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java +++ b/qpid/java/broker-plugins/derby-store/src/test/java/org/apache/qpid/server/store/derby/DerbyMessageStoreQuotaEventsTest.java @@ -20,6 +20,9 @@ */ package org.apache.qpid.server.store.derby; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.util.Collections; import java.util.Map; @@ -28,9 +31,6 @@ import org.apache.qpid.server.store.MessageStore; import org.apache.qpid.server.store.MessageStoreQuotaEventsTestBase; import org.apache.qpid.server.virtualhost.derby.DerbyVirtualHost; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTestBase { private static final int NUMBER_OF_MESSAGES_TO_OVERFILL_STORE = 10; @@ -54,6 +54,7 @@ public class DerbyMessageStoreQuotaEventsTest extends MessageStoreQuotaEventsTes { final DerbyVirtualHost parent = mock(DerbyVirtualHost.class); when(parent.getContext()).thenReturn(createContextSettings()); + when(parent.getContextKeys()).thenReturn(Collections.emptySet()); when(parent.getStorePath()).thenReturn(storeLocation); when(parent.getStoreOverfullSize()).thenReturn(OVERFULL_SIZE); when(parent.getStoreUnderfullSize()).thenReturn(UNDERFULL_SIZE); diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java index 4f88e011fb..9453c135e9 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCConfigurationStore.java @@ -27,6 +27,7 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.Set; import java.util.concurrent.atomic.AtomicBoolean; import javax.security.auth.Subject; @@ -84,7 +85,7 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor JDBCSettings settings = (JDBCSettings)parent; _connectionURL = settings.getConnectionUrl(); - JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent.getContext()); + JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent); if (!details.isKnownVendor() && getLogger().isInfoEnabled()) { @@ -111,8 +112,13 @@ public class GenericJDBCConfigurationStore extends AbstractJDBCConfigurationStor try { - Map<String, String> providerAttributes = new HashMap(_parent.getContext()); - providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames()); + Map<String, String> providerAttributes = new HashMap<>(); + Set<String> providerAttributeNames = connectionProviderFactory.getProviderAttributeNames(); + providerAttributeNames.retainAll(parent.getContextKeys()); + for(String attr : providerAttributeNames) + { + providerAttributes.put(attr, parent.getContextValue(String.class, attr)); + } _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, settings.getUsername(), diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java index 4fde0a44c7..3304d01d86 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/GenericJDBCMessageStore.java @@ -28,6 +28,9 @@ import java.sql.ResultSet; import java.sql.SQLException; import java.util.HashMap; import java.util.Map; +import java.util.Set; + +import javax.security.auth.Subject; import org.apache.log4j.Logger; @@ -36,8 +39,6 @@ import org.apache.qpid.server.plugin.JDBCConnectionProviderFactory; import org.apache.qpid.server.security.SecurityManager; import org.apache.qpid.server.store.StoreException; -import javax.security.auth.Subject; - /** * Implementation of a MessageStore backed by a Generic JDBC Database. */ @@ -60,7 +61,7 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore JDBCSettings settings = (JDBCSettings)parent; _connectionURL = settings.getConnectionUrl(); - JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent.getContext()); + JDBCDetails details = JDBCDetails.getDetailsForJdbcUrl(_connectionURL, parent); if (!details.isKnownVendor() && getLogger().isInfoEnabled()) { @@ -90,9 +91,13 @@ public class GenericJDBCMessageStore extends GenericAbstractJDBCMessageStore try { - Map<String, String> providerAttributes = new HashMap(parent.getContext()); - providerAttributes.keySet().retainAll(connectionProviderFactory.getProviderAttributeNames()); - + Map<String, String> providerAttributes = new HashMap<>(); + Set<String> providerAttributeNames = connectionProviderFactory.getProviderAttributeNames(); + providerAttributeNames.retainAll(parent.getContextKeys()); + for(String attr : providerAttributeNames) + { + providerAttributes.put(attr, parent.getContextValue(String.class, attr)); + } _connectionProvider = connectionProviderFactory.getConnectionProvider(_connectionURL, settings.getUsername(), diff --git a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java index a74f852dfa..8cd4996033 100644 --- a/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java +++ b/qpid/java/broker-plugins/jdbc-store/src/main/java/org/apache/qpid/server/store/jdbc/JDBCDetails.java @@ -19,9 +19,15 @@ package org.apache.qpid.server.store.jdbc; +import java.util.AbstractMap; +import java.util.AbstractSet; import java.util.Collections; import java.util.HashMap; +import java.util.Iterator; import java.util.Map; +import java.util.Set; + +import org.apache.qpid.server.model.ConfiguredObject; public abstract class JDBCDetails { @@ -216,7 +222,75 @@ public abstract class JDBCDetails return result; } - + public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final ConfiguredObject<?> object) + { + final Set<String> contextKeys = object.getContextKeys(); + Map<String,String> mapConversion = new AbstractMap<String, String>() + { + @Override + public Set<Entry<String, String>> entrySet() + { + return new AbstractSet<Entry<String, String>>() + { + @Override + public Iterator<Entry<String, String>> iterator() + { + final Iterator<String> underlying = contextKeys.iterator(); + return new Iterator<Entry<String, String>>() + { + @Override + public boolean hasNext() + { + return underlying.hasNext(); + } + + @Override + public Entry<String, String> next() + { + final String key = underlying.next(); + final String value = object.getContextValue(String.class, key); + return new Entry<String,String>() + { + + @Override + public String getKey() + { + return key; + } + + @Override + public String getValue() + { + return value; + } + + @Override + public String setValue(final String value) + { + throw new UnsupportedOperationException(); + } + }; + + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } + }; + } + + @Override + public int size() + { + return contextKeys.size(); + } + }; + } + }; + return getDetailsForJdbcUrl(jdbcUrl, mapConversion); + } public static JDBCDetails getDetailsForJdbcUrl(String jdbcUrl, final Map<String, String> contextMap) { String[] components = jdbcUrl.split(":", 3); |
