diff options
| author | Martin Ritchie <ritchiem@apache.org> | 2009-03-25 18:39:25 +0000 |
|---|---|---|
| committer | Martin Ritchie <ritchiem@apache.org> | 2009-03-25 18:39:25 +0000 |
| commit | ed17fb88e7917782495cbca795eaf98878e48fe6 (patch) | |
| tree | 85a493d4e4873e5407bde677f7d478c73c2f18f3 /qpid/java | |
| parent | 19de7a84b5fae3b977593f0fb81095c1c2c78f55 (diff) | |
| download | qpid-python-ed17fb88e7917782495cbca795eaf98878e48fe6.tar.gz | |
QPID-1735 : Added Documentation to QueueBackingStore around thread safety of load/unload, Updated FQBS to adhere to the thread safety specified by the interface. QueueEntry was updated to return the AMQMessage from the load() to complete the getMessage() interface. As in a flowed state the message may be purged before a reference can be taken. Added new Test QueueEntryImplThreadingTest that should later be run for longer but aims to show that load always returns the message even when unloads are occuring asynchronously.
Commit from 0.5-release : r758388
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@758397 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
11 files changed, 258 insertions, 122 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java index 0e5a4efba6..a22eea2b5e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java @@ -157,10 +157,10 @@ public class FileQueueBackingStore implements QueueBackingStore { try { - input.close(); - // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it - // on disk until it has been deleted from the queue at that point we can be sure we won't need the data - //handle.delete(); + if (input != null) + { + input.close(); + } } catch (IOException e) { @@ -171,101 +171,123 @@ public class FileQueueBackingStore implements QueueBackingStore throw new UnableToRecoverMessageException(error); } + /** + * Thread safety is ensured here by synchronizing on the message object. + * + * This is safe as load() calls will fail until the first thread through here has created the file on disk + * and fully written the content. + * + * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation. + * + * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls + * to skip the writing. + * + * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus + * safely allowing any reference to the message to be cleared prompting a load call. + * + * @param message the message to unload + * @throws UnableToFlowMessageException + */ public void unload(AMQMessage message) throws UnableToFlowMessageException { - long messageId = message.getMessageId(); + //Synchorize on the message to ensure that one only thread can unload at a time. + // If a second unload is attempted then it will block until the unload has completed. + synchronized (message) + { + long messageId = message.getMessageId(); - File handle = getFileHandle(messageId); + File handle = getFileHandle(messageId); - //If we have written the data once then we don't need to do it again. - if (handle.exists()) - { - if (_log.isDebugEnabled()) + //If we have written the data once then we don't need to do it again. + if (handle.exists()) { - _log.debug("Message(ID:" + messageId + ") already unloaded."); + if (_log.isDebugEnabled()) + { + _log.debug("Message(ID:" + messageId + ") already unloaded."); + } + return; } - return; - } - if (_log.isInfoEnabled()) - { - _log.info("Unloading Message (ID:" + messageId + ")"); - } + if (_log.isInfoEnabled()) + { + _log.info("Unloading Message (ID:" + messageId + ")"); + } - ObjectOutputStream writer = null; - Exception error = null; + ObjectOutputStream writer = null; + Exception error = null; - try - { - writer = new ObjectOutputStream(new FileOutputStream(handle)); + try + { + writer = new ObjectOutputStream(new FileOutputStream(handle)); - writer.writeLong(message.getArrivalTime()); + writer.writeLong(message.getArrivalTime()); - MessagePublishInfo mpi = message.getMessagePublishInfo(); - writer.writeUTF(String.valueOf(mpi.getExchange())); - writer.writeUTF(String.valueOf(mpi.getRoutingKey())); - writer.writeBoolean(mpi.isMandatory()); - writer.writeBoolean(mpi.isImmediate()); - ContentHeaderBody chb = message.getContentHeaderBody(); + MessagePublishInfo mpi = message.getMessagePublishInfo(); + writer.writeUTF(String.valueOf(mpi.getExchange())); + writer.writeUTF(String.valueOf(mpi.getRoutingKey())); + writer.writeBoolean(mpi.isMandatory()); + writer.writeBoolean(mpi.isImmediate()); + ContentHeaderBody chb = message.getContentHeaderBody(); - // write out the content header body - final int bodySize = chb.getSize(); - byte[] underlying = new byte[bodySize]; - ByteBuffer buf = ByteBuffer.wrap(underlying); - chb.writePayload(buf); + // write out the content header body + final int bodySize = chb.getSize(); + byte[] underlying = new byte[bodySize]; + ByteBuffer buf = ByteBuffer.wrap(underlying); + chb.writePayload(buf); - writer.writeInt(bodySize); - writer.write(underlying, 0, bodySize); + writer.writeInt(bodySize); + writer.write(underlying, 0, bodySize); - int bodyCount = message.getBodyCount(); - writer.writeInt(bodyCount); + int bodyCount = message.getBodyCount(); + writer.writeInt(bodyCount); - //WriteContentBody - for (int index = 0; index < bodyCount; index++) - { - ContentChunk chunk = message.getContentChunk(index); - int length = chunk.getSize(); + //WriteContentBody + for (int index = 0; index < bodyCount; index++) + { + ContentChunk chunk = message.getContentChunk(index); + int length = chunk.getSize(); - byte[] chunk_underlying = new byte[length]; + byte[] chunk_underlying = new byte[length]; - ByteBuffer chunk_buf = chunk.getData(); + ByteBuffer chunk_buf = chunk.getData(); - chunk_buf.duplicate().rewind().get(chunk_underlying); + chunk_buf.duplicate().rewind().get(chunk_underlying); - writer.writeInt(length); - writer.write(chunk_underlying, 0, length); + writer.writeInt(length); + writer.write(chunk_underlying, 0, length); + } } - } - catch (FileNotFoundException e) - { - error = e; - } - catch (IOException e) - { - error = e; - } - finally - { - // In a FileNotFound situation writer will be null. - if (writer != null) + catch (FileNotFoundException e) { - try - { - writer.flush(); - writer.close(); - } - catch (IOException e) + error = e; + } + catch (IOException e) + { + error = e; + } + finally + { + // In a FileNotFound situation writer will be null. + if (writer != null) { - error = e; + try + { + writer.flush(); + writer.close(); + } + catch (IOException e) + { + error = e; + } } } - } - if (error != null) - { - _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); - handle.delete(); - throw new UnableToFlowMessageException(messageId, error); + if (error != null) + { + _log.error("Unable to unload message(" + messageId + ") to disk, restoring state."); + handle.delete(); + throw new UnableToFlowMessageException(messageId, error); + } } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java index 0c4b8a0b42..5e5901bcd7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java @@ -166,7 +166,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList { // If we've increased the minimum memory above what we have in memory then // we need to inhale more if there is more - if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) + if (!_disabled && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0) { startInhaler(); } @@ -204,7 +204,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryUnloadedUpdateMemory(QueueEntry queueEntry) { - if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) + if (!_disabled && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0) { _log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity()); } @@ -219,7 +219,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList */ public void entryLoadedUpdateMemory(QueueEntry queueEntry) { - if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) + if (!_disabled && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum) { _log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum); setFlowed(true); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java index 5efb95d0c0..5c65cb6424 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java @@ -26,8 +26,36 @@ import org.apache.commons.configuration.ConfigurationException; public interface QueueBackingStore { + /** + * Retrieve the message with a given ID + * + * This method must be thread safe. + * + * Multiple calls to load with a given messageId DO NOT need to return the same object. + * + * @param messageId the id of the message to retreive. + * @return + */ AMQMessage load(Long messageId); + /** + * Store a message in the BackingStore. + * + * This method must be thread safe understanding that multiple message objects may be the same data. + * + * Allowing a thread to return from this method means that it is safe to call load() + * + * Implementer guide: + * Until the message has been loaded the message references will all be the same object. + * + * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method + * until the message is fully on disk. This can be done by synchronising on message as initially it is always the + * same object. Only after a load has taken place will there be a discrepency. + * + * + * @param message the message to unload + * @throws UnableToFlowMessageException + */ void unload(AMQMessage message) throws UnableToFlowMessageException; void delete(Long messageId); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index f22220ada9..fb23edb3c5 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -226,7 +226,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept void unload(); - void load(); + AMQMessage load(); boolean isFlowed(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 1a3e08ab5c..b6e6365189 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -31,6 +31,7 @@ import java.util.Set; import java.util.concurrent.CopyOnWriteArraySet; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLongFieldUpdater; +import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReferenceFieldUpdater; public class QueueEntryImpl implements QueueEntry @@ -41,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry private final SimpleQueueEntryList _queueEntryList; - private AMQMessage _message; + private AtomicReference<AMQMessage> _messageRef; private boolean _redelivered; @@ -102,7 +103,7 @@ public class QueueEntryImpl implements QueueEntry public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message) { _queueEntryList = queueEntryList; - _message = message; + _messageRef = new AtomicReference<AMQMessage>(message); if (message != null) { _messageId = message.getMessageId(); @@ -136,11 +137,7 @@ public class QueueEntryImpl implements QueueEntry public AMQMessage getMessage() { - if (_message == null) - { - return _backingStore.load(_messageId); - } - return _message; + return load(); } public Long getMessageId() @@ -231,13 +228,17 @@ public class QueueEntryImpl implements QueueEntry public String debugIdentity() { String entry = "[State:" + _state.getState().name() + "]"; - if (_message == null) + + AMQMessage message = _messageRef.get(); + + if (message == null) { return entry + "(Message Unloaded ID:" + _messageId + ")"; } else { - return entry + _message.debugIdentity(); + + return entry + message.debugIdentity(); } } @@ -398,23 +399,27 @@ public class QueueEntryImpl implements QueueEntry public void unload() { - if (_message != null && _backingStore != null) - { + //Get the currently loaded message + AMQMessage message = _messageRef.get(); + // If we have a message in memory and we have a valid backingStore attempt to unload + if (message != null && _backingStore != null) + { try { - if (!_hasBeenUnloaded) + // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure + // multiple initial unloads are unloads + _backingStore.unload(message); + _hasBeenUnloaded = true; + _messageRef.set(null); + + if (_log.isDebugEnabled()) { - _hasBeenUnloaded = true; + _log.debug("Unloaded:" + debugIdentity()); + } - _backingStore.unload(_message); - if (_log.isDebugEnabled()) - { - _log.debug("Unloaded:" + debugIdentity()); - } - } - _message = null; + // Clear the message reference if the loaded message is still the one we are processing. //Update the memoryState if this load call resulted in the message being purged from memory if (!_flowed.getAndSet(true)) @@ -434,23 +439,56 @@ public class QueueEntryImpl implements QueueEntry } } - public void load() + public AMQMessage load() { + // MessageId and Backing store are null in test scenarios, normally this is not the case. if (_messageId != null && _backingStore != null) { - _message = _backingStore.load(_messageId); - - if (_log.isDebugEnabled()) + // See if we have the message currently in memory to return + AMQMessage message = _messageRef.get(); + // if we don't then we need to start a load process. + if (message == null) { - _log.debug("Loaded:" + debugIdentity()); - } + //Synchronize here to ensure only the first thread that attempts to load will perform the load from the + // backing store. + synchronized (this) + { + // Check again to see if someone else ahead of us loaded the message + message = _messageRef.get(); + // if we still don't have the message then we need to start a load process. + if (message == null) + { + // Load the message and keep a reference to it + message = _backingStore.load(_messageId); + // Set the message reference + _messageRef.set(message); + } + else + { + // If someone else loaded the message then we can jump out here as the Memory Updates will + // have been performed by the loading thread + return message; + } + } - //Update the memoryState if this load call resulted in the message comming in to memory - if (_flowed.getAndSet(false)) - { - _queueEntryList.entryLoadedUpdateMemory(this); + if (_log.isDebugEnabled()) + { + _log.debug("Loaded:" + debugIdentity()); + } + + //Update the memoryState if this load call resulted in the message comming in to memory + if (_flowed.getAndSet(false)) + { + _queueEntryList.entryLoadedUpdateMemory(this); + } } + + // Return the message that was either already in memory or the value we just loaded. + return message; } + // This can be null but only in the case where we have no messageId + // in the case where we have no backingStore then we will never have unloaded the message + return _messageRef.get(); } public boolean isFlowed() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 3ab127b59d..c370fd9867 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -32,6 +32,7 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.qpid.server.subscription.MockSubscription; +import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.AMQException; import java.util.Map; @@ -96,6 +97,12 @@ public class ExtractResendAndRequeueTest extends TestCase assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size()); } + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } + /** * Helper method to create a new subscription and aquire the given messages. * diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 22e49a0241..ee1796ba2f 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -36,11 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage; import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.FailedDequeueException; import org.apache.qpid.server.queue.IncomingMessage; -import org.apache.qpid.server.queue.MessageCleanupException; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.queue.SimpleAMQQueue; -import org.apache.qpid.server.queue.UnableToFlowMessageException; import org.apache.qpid.server.registry.ApplicationRegistry; import org.apache.qpid.server.store.MemoryMessageStore; import org.apache.qpid.server.store.SkeletonMessageStore; @@ -356,9 +354,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase //To change body of implemented methods use File | Settings | File Templates. } - public void load() + public AMQMessage load() { - //To change body of implemented methods use File | Settings | File Templates. + return null; } public boolean isFlowed() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java index 9344efd4a8..e7b3f40393 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java @@ -23,10 +23,18 @@ package org.apache.qpid.server.filter; import junit.framework.TestCase; import org.apache.qpid.AMQException; import org.apache.qpid.server.queue.MockQueueEntry; +import org.apache.qpid.server.registry.ApplicationRegistry; public class PropertyExpressionTest extends TestCase { + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockQueueEntry will create + ApplicationRegistry.remove(1); + } + + public void testJMSRedelivered() { PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java index b38da53406..11049a7ae3 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java @@ -23,6 +23,14 @@ package org.apache.qpid.server.queue; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.AMQException; import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.ContentHeaderBody; +import org.apache.qpid.framing.BasicContentHeaderProperties; +import org.apache.qpid.framing.BasicPublishBody; +import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl; + +import java.util.LinkedList; +import java.util.ArrayList; public class MockAMQMessage extends TransientAMQMessage { @@ -31,6 +39,14 @@ public class MockAMQMessage extends TransientAMQMessage { super(messageId); _messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null); + BasicContentHeaderProperties properties = new BasicContentHeaderProperties(); + + properties.setMessageId(String.valueOf(messageId)); + properties.setTimestamp(System.currentTimeMillis()); + properties.setDeliveryMode((byte)1); + + _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID); + _contentBodies = new ArrayList<ContentChunk>(); } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java index d9e4cc9b70..ff814840bc 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java @@ -20,16 +20,18 @@ */ package org.apache.qpid.server.queue; +import org.apache.commons.configuration.PropertiesConfiguration; +import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.framing.FieldTable; -import org.apache.qpid.server.virtualhost.VirtualHost; -import org.apache.qpid.server.configuration.QueueConfiguration; +import org.apache.qpid.server.configuration.ServerConfiguration; +import org.apache.qpid.server.configuration.VirtualHostConfiguration; import org.apache.qpid.server.exchange.Exchange; -import org.apache.qpid.server.subscription.Subscription; -import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.management.ManagedObject; -import org.apache.qpid.AMQException; -import org.apache.commons.configuration.Configuration; +import org.apache.qpid.server.store.StoreContext; +import org.apache.qpid.server.subscription.Subscription; +import org.apache.qpid.server.virtualhost.VirtualHost; +import org.apache.qpid.server.registry.ApplicationRegistry; import java.util.List; import java.util.Set; @@ -39,10 +41,20 @@ public class MockAMQQueue implements AMQQueue private boolean _deleted = false; private int _queueCount; private AMQShortString _name; + private VirtualHost _virtualhost; public MockAMQQueue(String name) { - _name = new AMQShortString(name); + _name = new AMQShortString(name); + _virtualhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test"); + try + { + _virtualhost.getQueueRegistry().registerQueue(this); + } + catch (AMQException e) + { + e.printStackTrace(); + } } public AMQShortString getName() @@ -67,7 +79,7 @@ public class MockAMQQueue implements AMQQueue public VirtualHost getVirtualHost() { - return null; //To change body of implemented methods use File | Settings | File Templates. + return _virtualhost; } public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException @@ -152,7 +164,7 @@ public class MockAMQQueue implements AMQQueue public int delete() throws AMQException { - return 0; //To change body of implemented methods use File | Settings | File Templates. + return 0; //To change body of implemented methods use File | Settings | File Templates. } public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException @@ -343,7 +355,7 @@ public class MockAMQQueue implements AMQQueue public void setMinimumAlertRepeatGap(long value) { - + } } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java index 6c6835ccca..abcd9855d9 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java @@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.AMQQueue; import org.apache.qpid.server.queue.MockAMQQueue; import org.apache.qpid.server.queue.MockProtocolSession; import org.apache.qpid.server.store.TestableMemoryMessageStore; +import org.apache.qpid.server.registry.ApplicationRegistry; public class ACLManagerTest extends TestCase { @@ -67,6 +68,12 @@ public class ACLManagerTest extends TestCase _session = new MockProtocolSession(new TestableMemoryMessageStore()); } + + public void tearDown() throws Exception + { + //Ensure we close the registry that the MockAMQQueue will create + ApplicationRegistry.getInstance().close(); + } public void testACLManagerConfigurationPluginManager() throws Exception { |
