diff options
Diffstat (limited to 'java')
11 files changed, 258 insertions, 122 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java index 0e5a4efba6..a22eea2b5e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java @@ -157,10 +157,10 @@ public class FileQueueBackingStore implements QueueBackingStore { try { - input.close(); - // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it - // on disk until it has been deleted from the queue at that point we can be sure we won't need the data - //handle.delete(); + if (input != null) + { + input.close(); + } } catch (IOException e) { @@ -171,101 +171,123 @@ public class FileQueueBackingStore implements QueueBackingStore throw new UnableToRecoverMessageException(error); } + /** + * Thread safety is ensured here by synchronizing on the message object. + * + * This is safe as load() calls will fail until the first thread through here has created the file on disk + * and fully written the content. + * + * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation. + * + * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls + * to skip the writing. + * + * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus + * safely allowing any reference to the message to be cleared prompting a load call. + * + * @param message the message to unload + * @throws UnableToFlowMessageException + */ public void unload(AMQMessage message) throws UnableToFlowMessageException { - long messageId = message.getMessageId(); + //Synchorize on the message to ensure that one only thread can unload at a time. + // If a second unload is attempted then it will block until the unload has completed. + synchronized (message) + { + long messageId = message.getMessageId(); - File handle = getFileHandle(messageId); + File handle = getFileHandle(messageId); - //If we have written the data once then we don't need to do it again. - if (handle.exists()) - { - if (_log.isDebugEnabled()) + //If we have written the data once then we don't need to do it again. + if (handle.exists()) { - _log.debug("Message(ID:" + messageId + ") already unloaded."); + if (_log.isDebugEnabled()) + { + _log.debug("Message(ID:" + messageId + ") already unloaded."); + } + return; } - return; - } - if (_log.isInfoEnabled()) - { - _log.info("Unloading Message (ID:" + messageId + ")"); - } + if (_log.isInfoEnabled()) + { + _log.info("Unloading Message (ID:" + messageId + ")"); + } - ObjectOutputStream writer = null; - Exception error = null; + ObjectOutputStream writer = null; + Exception error = null; - try - { - writer = new ObjectOutputStream(new FileOutputStream(handle)); + try + { + writer = new ObjectOutputStream(new FileOutputStream(handle)); - writer.writeLong(message.getArrivalTime()); + writer.writeLong(message.getArrivalTime()); - MessagePublishInfo mpi = message.getMessagePublishInfo(); - writer.writeUTF(String.valueOf(mpi.getExchange())); - writer.writeUTF(String.valueOf(mpi.getRoutingKey())); - writer.writeBoolean(mpi.isMandatory()); - writer.writeBoolean(mpi.isImmediate()); - ContentHeaderBody chb = message.getContentHeaderBody(); + MessagePublishInfo mpi = message.getMessagePublishInfo(); + writer.writeUTF(String.valueOf(mpi.getExchange())); + writer.writeUTF(String.valueOf(mpi.getRoutingKey())); + writer.writeBoolean(mpi.isMandatory()); + writer.writeBoolean(mpi.isImmediate()); + ContentHeaderBody chb = message.getContentHeaderBody(); - // write out the content header body - final int bodySize = chb.getSize(); - byte[] underlying = new byte[bodySize]; - ByteBuffer buf = ByteBuffer.wrap(underlying); - chb.writePayload(buf); + // write out the content header body + final int bodySize = chb.getSize(); + byte[] underlying = new byte[bodySize]; + ByteBuffer buf = ByteBuffer.wrap(underlying); + chb.writePayload(buf); - writer.writeInt(bodySize); - writer.write(underlying, 0, bodySize); + writer.writeInt(bodySize); + writer.write(underlying, 0, bodySize); - int bodyCount = message.getBodyCount(); - writer.writeInt(bodyCount); + int bodyCount = message.getBodyCount(); + writer.writeInt(bodyCount); - //WriteContentBody - for (int index = 0; index < bodyCount; index++) - { - ContentChunk chunk = message.getContentChunk(index); - int length = chunk.getSize(); + //WriteContentBody + for (int index = 0; index < bodyCount; index++) + { + ContentChunk chunk = message.getContentChunk(index); + int length = chunk.getSize(); - byte[] chunk_underlying = new byte[length]; + byte[] chunk_underlying = new byte[length]; - ByteBuffer chunk_buf = chunk.getData(); + ByteBuffer chunk_buf = chunk.getData(); - chunk_buf.duplicate().rewind().get(chunk_underlying); + chunk_buf.duplicate().rewind().get(chunk_underlying); - writer.writeInt(length); - writer.write(chunk_underlying, 0, length); + writer.writeInt(length); + writer.write(chunk_underlying, 0, length); + } } - } - catch (FileNotFoundException e) - { - error = e; - } - catch (IOException e) - { - error = e; - } - finally - { - // In a FileNotFound situation writer will be null. - if (writer != null) + catch (FileNotFoundException e) { - try - { - writer.flush(); - writer.close(); - } - catch (IOException e) + error = e; + } + catch (IOException e) + { + error = e; + } + finally + { + // In a FileNotFound situation writer will be null. + if (writer != null) { - error = e; + try + { + writer.flush(); + writer.close(); + } + catch (IOException e) + { + error = e; + } } } - } - if (error != null) - { - _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); - handle.delete(); - throw new UnableToFlowMessageException(messageId, error); + if (error != null) + { + _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); + handle.delete(); + throw new UnableToFlowMessageException(messageId, error); + } } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 0c4b8a0b42..5e5901bcd7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -166,7 +166,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList { // If we've increased the minimum memory above what we have in memory then // we need to inhale more if there is more - if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) + if (!_disabled && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) { startInhaler(); } @@ -204,7 +204,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryUnloadedUpdateMemory(QueueEntry queueEntry) { - if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + if (!_disabled && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) { _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); } @@ -219,7 +219,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryLoadedUpdateMemory(QueueEntry queueEntry) { - if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (!_disabled && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) { _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); setFlowed(true); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java index 5efb95d0c0..5c65cb6424 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java @@ -26,8 +26,36 @@ import org.apache.commons.configuration.ConfigurationException; public interface QueueBackingStore { + /** + * Retrieve the message with a given ID + * + * This method must be thread safe. + * + * Multiple calls to load with a given messageId DO NOT need to return the same object. + * + * @param messageId the id of the message to retreive. + * @return + */ AMQMessage load(Long messageId); + /** + * Store a message in the BackingStore. + * + * This method must be thread safe understanding that multiple message objects may be the same data. + * + * Allowing a thread to return from this method means that it is safe to call load() + * + * Implementer guide: + * Until the message has been loaded the message references will all be the same object. + * + * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method + * until the message is fully on disk. This can be done by synchronising on message as initially it is always the + * same object. Only after a load has taken place will there be a discrepency. + * + * + * @param message the message to unload + * @throws UnableToFlowMessageException + */ void unload(AMQMessage message) throws UnableToFlowMessageException; void delete(Long messageId); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index f22220ada9..fb23edb3c5 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -226,7 +226,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void unload(); - void load(); + AMQMessage load(); boolean isFlowed(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1a3e08ab5c..b6e6365189 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class QueueEntryImpl implements QueueEntry @@ -41,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private AtomicReference<AMQMessage> _messageRef; private boolean _redelivered; @@ -102,7 +103,7 @@ public class QueueEntryImpl implements QueueEntry public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) { _queueEntryList = queueEntryList; - _message = message; + _messageRef = new AtomicReference<AMQMessage>(message); if (message != null) { _messageId = message.getMessageId(); @@ -136,11 +137,7 @@ public class QueueEntryImpl implements QueueEntry public AMQMessage getMessage() { - if (_message == null) - { - return _backingStore.load(_messageId); - } - return _message; + return load(); } public Long getMessageId() @@ -231,13 +228,17 @@ public class QueueEntryImpl implements QueueEntry public String debugIdentity() { String entry = "[State:" + _state.getState().name() + "]"; - if (_message == null) + + AMQMessage message = _messageRef.get(); + + if (message == null) { return entry + "(Message Unloaded ID:" + _messageId + ")"; } else { - return entry + _message.debugIdentity(); + + return entry + message.debugIdentity(); } } @@ -398,23 +399,27 @@ public class QueueEntryImpl implements QueueEntry public void unload() { - if (_message != null && _backingStore != null) - { + //Get the currently loaded message + AMQMessage message = _messageRef.get(); + // If we have a message in memory and we have a valid backingStore attempt to unload + if (message != null && _backingStore != null) + { try { - if (!_hasBeenUnloaded) + // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure + // multiple initial unloads are unloads + _backingStore.unload(message); + _hasBeenUnloaded = true; + _messageRef.set(null); + + if (_log.isDebugEnabled()) { - _hasBeenUnloaded = true; + _log.debug("Unloaded:" + debugIdentity()); + } - _backingStore.unload(_message); - if (_log.isDebugEnabled()) - { - _log.debug("Unloaded:" + debugIdentity()); - } - } - _message = null; + // Clear the message reference if the loaded message is still the one we are processing. //Update the memoryState if this load call resulted in the message being purged from memory if (!_flowed.getAndSet(true)) @@ -434,23 +439,56 @@ public class QueueEntryImpl implements QueueEntry } } - public void load() + public AMQMessage load() { + // MessageId and Backing store are null in test scenarios, normally this is not the case. if (_messageId != null && _backingStore != null) { - _message = _backingStore.load(_messageId); - - if (_log.isDebugEnabled()) + // See if we have the message currently in memory to return + AMQMessage message = _messageRef.get(); + // if we don't then we need to start a load process. + if (message == null) { - _log.debug("Loaded:" + debugIdentity()); - } + //Synchronize here to ensure only the first thread that attempts to load will perform the load from the + // backing store. + synchronized (this) + { + // Check again to see if someone else ahead of us loaded the message + message = _messageRef.get(); + // if we still don't have the message then we need to start a load process. + if (message == null) + { + // Load the message and keep a reference to it + message = _backingStore.load(_messageId); + // Set the message reference + _messageRef.set(message); + } + else + { + // If someone else loaded the message then we can jump out here as the Memory Updates will + // have been performed by the loading thread + return message; + } + } - //Update the memoryState if this load call resulted in the message comming in to memory - if (_flowed.getAndSet(false)) - { - _queueEntryList.entryLoadedUpdateMemory(this); + if (_log.isDebugEnabled()) + { + _log.debug("Loaded:" + debugIdentity()); + } + + //Update the memoryState if this load call resulted in the message comming in to memory + if (_flowed.getAndSet(false)) + { + _queueEntryList.entryLoadedUpdateMemory(this); + } } + + // Return the message that was either already in memory or the value we just loaded. + return message; } + // This can be null but only in the case where we have no messageId + // in the case where we have no backingStore then we will never have unloaded the message + return _messageRef.get(); } public boolean isFlowed() diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 3ab127b59d..c370fd9867 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import java.util.Map; @@ -96,6 +97,12 @@ public class ExtractResendAndRequeueTest extends TestCase assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } + /** * Helper method to create a new subscription and aquire the given messages. * diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 22e49a0241..ee1796ba2f 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -36,11 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.FailedDequeueException; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageCleanupException; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.UnableToFlowMessageException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -356,9 +354,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void load() + public AMQMessage load() { - //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isFlowed() diff --git a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java index 9344efd4a8..e7b3f40393 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java @@ -23,10 +23,18 @@ package org.apache.qpid.server.filter; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.MockQueueEntry; +import org.apache.qpid.server.registry.ApplicationRegistry; public class PropertyExpressionTest extends TestCase { + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockQueueEntry will create + ApplicationRegistry.remove(1); + } + + public void testJMSRedelivered() { PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered"); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index b38da53406..11049a7ae3 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -23,6 +23,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; + +import java.util.LinkedList; +import java.util.ArrayList; public class MockAMQMessage extends TransientAMQMessage { @@ -31,6 +39,14 @@ public class MockAMQMessage extends TransientAMQMessage { super(messageId); _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null); + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setMessageId(String.valueOf(messageId)); + properties.setTimestamp(System.currentTimeMillis()); + properties.setDeliveryMode((byte)1); + + _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID); + _contentBodies = new ArrayList<ContentChunk>(); } diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index d9e4cc9b70..ff814840bc 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.AMQException; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; import java.util.List; import java.util.Set; @@ -39,10 +41,20 @@ public class MockAMQQueue implements AMQQueue private boolean _deleted = false; private int _queueCount; private AMQShortString _name; + private VirtualHost _virtualhost; public MockAMQQueue(String name) { - _name = new AMQShortString(name); + _name = new AMQShortString(name); + _virtualhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + try + { + _virtualhost.getQueueRegistry().registerQueue(this); + } + catch (AMQException e) + { + e.printStackTrace(); + } } public AMQShortString getName() @@ -67,7 +79,7 @@ public class MockAMQQueue implements AMQQueue public VirtualHost getVirtualHost() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _virtualhost; } public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -152,7 +164,7 @@ public class MockAMQQueue implements AMQQueue public int delete() throws AMQException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; //To change body of implemented methods use File | Settings | File Templates. } public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException @@ -343,7 +355,7 @@ public class MockAMQQueue implements AMQQueue public void setMinimumAlertRepeatGap(long value) { - + } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index 6c6835ccca..abcd9855d9 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase { @@ -67,6 +68,12 @@ public class ACLManagerTest extends TestCase _session = new MockProtocolSession(new TestableMemoryMessageStore()); } + + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } public void testACLManagerConfigurationPluginManager() throws Exception { |
