summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java170
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java6
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java28
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java98
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java7
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java6
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java8
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java16
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java32
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java7
11 files changed, 258 insertions, 122 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
index 0e5a4efba6..a22eea2b5e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FileQueueBackingStore.java
@@ -157,10 +157,10 @@ public class FileQueueBackingStore implements QueueBackingStore
{
try
{
- input.close();
- // We can purge the message here then reflow it if required but I believe it to be cleaner to leave it
- // on disk until it has been deleted from the queue at that point we can be sure we won't need the data
- //handle.delete();
+ if (input != null)
+ {
+ input.close();
+ }
}
catch (IOException e)
{
@@ -171,101 +171,123 @@ public class FileQueueBackingStore implements QueueBackingStore
throw new UnableToRecoverMessageException(error);
}
+ /**
+ * Thread safety is ensured here by synchronizing on the message object.
+ *
+ * This is safe as load() calls will fail until the first thread through here has created the file on disk
+ * and fully written the content.
+ *
+ * After this point new AMQMessages can exist that reference the same data thus breaking the synchronisation.
+ *
+ * Thread safety is maintained here as the existence of the file is checked allowing then subsequent unload() calls
+ * to skip the writing.
+ *
+ * Multiple unload() calls will initially be blocked using the synchronization until the data exists on disk thus
+ * safely allowing any reference to the message to be cleared prompting a load call.
+ *
+ * @param message the message to unload
+ * @throws UnableToFlowMessageException
+ */
public void unload(AMQMessage message) throws UnableToFlowMessageException
{
- long messageId = message.getMessageId();
+ //Synchorize on the message to ensure that one only thread can unload at a time.
+ // If a second unload is attempted then it will block until the unload has completed.
+ synchronized (message)
+ {
+ long messageId = message.getMessageId();
- File handle = getFileHandle(messageId);
+ File handle = getFileHandle(messageId);
- //If we have written the data once then we don't need to do it again.
- if (handle.exists())
- {
- if (_log.isDebugEnabled())
+ //If we have written the data once then we don't need to do it again.
+ if (handle.exists())
{
- _log.debug("Message(ID:" + messageId + ") already unloaded.");
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Message(ID:" + messageId + ") already unloaded.");
+ }
+ return;
}
- return;
- }
- if (_log.isInfoEnabled())
- {
- _log.info("Unloading Message (ID:" + messageId + ")");
- }
+ if (_log.isInfoEnabled())
+ {
+ _log.info("Unloading Message (ID:" + messageId + ")");
+ }
- ObjectOutputStream writer = null;
- Exception error = null;
+ ObjectOutputStream writer = null;
+ Exception error = null;
- try
- {
- writer = new ObjectOutputStream(new FileOutputStream(handle));
+ try
+ {
+ writer = new ObjectOutputStream(new FileOutputStream(handle));
- writer.writeLong(message.getArrivalTime());
+ writer.writeLong(message.getArrivalTime());
- MessagePublishInfo mpi = message.getMessagePublishInfo();
- writer.writeUTF(String.valueOf(mpi.getExchange()));
- writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
- writer.writeBoolean(mpi.isMandatory());
- writer.writeBoolean(mpi.isImmediate());
- ContentHeaderBody chb = message.getContentHeaderBody();
+ MessagePublishInfo mpi = message.getMessagePublishInfo();
+ writer.writeUTF(String.valueOf(mpi.getExchange()));
+ writer.writeUTF(String.valueOf(mpi.getRoutingKey()));
+ writer.writeBoolean(mpi.isMandatory());
+ writer.writeBoolean(mpi.isImmediate());
+ ContentHeaderBody chb = message.getContentHeaderBody();
- // write out the content header body
- final int bodySize = chb.getSize();
- byte[] underlying = new byte[bodySize];
- ByteBuffer buf = ByteBuffer.wrap(underlying);
- chb.writePayload(buf);
+ // write out the content header body
+ final int bodySize = chb.getSize();
+ byte[] underlying = new byte[bodySize];
+ ByteBuffer buf = ByteBuffer.wrap(underlying);
+ chb.writePayload(buf);
- writer.writeInt(bodySize);
- writer.write(underlying, 0, bodySize);
+ writer.writeInt(bodySize);
+ writer.write(underlying, 0, bodySize);
- int bodyCount = message.getBodyCount();
- writer.writeInt(bodyCount);
+ int bodyCount = message.getBodyCount();
+ writer.writeInt(bodyCount);
- //WriteContentBody
- for (int index = 0; index < bodyCount; index++)
- {
- ContentChunk chunk = message.getContentChunk(index);
- int length = chunk.getSize();
+ //WriteContentBody
+ for (int index = 0; index < bodyCount; index++)
+ {
+ ContentChunk chunk = message.getContentChunk(index);
+ int length = chunk.getSize();
- byte[] chunk_underlying = new byte[length];
+ byte[] chunk_underlying = new byte[length];
- ByteBuffer chunk_buf = chunk.getData();
+ ByteBuffer chunk_buf = chunk.getData();
- chunk_buf.duplicate().rewind().get(chunk_underlying);
+ chunk_buf.duplicate().rewind().get(chunk_underlying);
- writer.writeInt(length);
- writer.write(chunk_underlying, 0, length);
+ writer.writeInt(length);
+ writer.write(chunk_underlying, 0, length);
+ }
}
- }
- catch (FileNotFoundException e)
- {
- error = e;
- }
- catch (IOException e)
- {
- error = e;
- }
- finally
- {
- // In a FileNotFound situation writer will be null.
- if (writer != null)
+ catch (FileNotFoundException e)
{
- try
- {
- writer.flush();
- writer.close();
- }
- catch (IOException e)
+ error = e;
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
+ finally
+ {
+ // In a FileNotFound situation writer will be null.
+ if (writer != null)
{
- error = e;
+ try
+ {
+ writer.flush();
+ writer.close();
+ }
+ catch (IOException e)
+ {
+ error = e;
+ }
}
}
- }
- if (error != null)
- {
- _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
- handle.delete();
- throw new UnableToFlowMessageException(messageId, error);
+ if (error != null)
+ {
+ _log.error("Unable to unload message(" + messageId + ") to disk, restoring state.");
+ handle.delete();
+ throw new UnableToFlowMessageException(messageId, error);
+ }
}
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
index 0c4b8a0b42..5e5901bcd7 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/FlowableBaseQueueEntryList.java
@@ -166,7 +166,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
{
// If we've increased the minimum memory above what we have in memory then
// we need to inhale more if there is more
- if (_atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
+ if (!_disabled && _atomicQueueInMemory.get() < _memoryUsageMinimum && _atomicQueueSize.get() > 0)
{
startInhaler();
}
@@ -204,7 +204,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryUnloadedUpdateMemory(QueueEntry queueEntry)
{
- if (_atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
+ if (!_disabled && _atomicQueueInMemory.addAndGet(-queueEntry.getSize()) < 0)
{
_log.error("InMemory Count just went below 0:" + queueEntry.debugIdentity());
}
@@ -219,7 +219,7 @@ public abstract class FlowableBaseQueueEntryList implements QueueEntryList
*/
public void entryLoadedUpdateMemory(QueueEntry queueEntry)
{
- if (_atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
+ if (!_disabled && _atomicQueueInMemory.addAndGet(queueEntry.getSize()) > _memoryUsageMaximum)
{
_log.error("Loaded to much data!:" + _atomicQueueInMemory.get() + "/" + _memoryUsageMaximum);
setFlowed(true);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
index 5efb95d0c0..5c65cb6424 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueBackingStore.java
@@ -26,8 +26,36 @@ import org.apache.commons.configuration.ConfigurationException;
public interface QueueBackingStore
{
+ /**
+ * Retrieve the message with a given ID
+ *
+ * This method must be thread safe.
+ *
+ * Multiple calls to load with a given messageId DO NOT need to return the same object.
+ *
+ * @param messageId the id of the message to retreive.
+ * @return
+ */
AMQMessage load(Long messageId);
+ /**
+ * Store a message in the BackingStore.
+ *
+ * This method must be thread safe understanding that multiple message objects may be the same data.
+ *
+ * Allowing a thread to return from this method means that it is safe to call load()
+ *
+ * Implementer guide:
+ * Until the message has been loaded the message references will all be the same object.
+ *
+ * One appraoch as taken by the @see FileQueueBackingStore is to block aimulataneous calls to this method
+ * until the message is fully on disk. This can be done by synchronising on message as initially it is always the
+ * same object. Only after a load has taken place will there be a discrepency.
+ *
+ *
+ * @param message the message to unload
+ * @throws UnableToFlowMessageException
+ */
void unload(AMQMessage message) throws UnableToFlowMessageException;
void delete(Long messageId);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index f22220ada9..fb23edb3c5 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -226,7 +226,7 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
void unload();
- void load();
+ AMQMessage load();
boolean isFlowed();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 1a3e08ab5c..b6e6365189 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -31,6 +31,7 @@ import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLongFieldUpdater;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.atomic.AtomicReferenceFieldUpdater;
public class QueueEntryImpl implements QueueEntry
@@ -41,7 +42,7 @@ public class QueueEntryImpl implements QueueEntry
private final SimpleQueueEntryList _queueEntryList;
- private AMQMessage _message;
+ private AtomicReference<AMQMessage> _messageRef;
private boolean _redelivered;
@@ -102,7 +103,7 @@ public class QueueEntryImpl implements QueueEntry
public QueueEntryImpl(SimpleQueueEntryList queueEntryList, AMQMessage message)
{
_queueEntryList = queueEntryList;
- _message = message;
+ _messageRef = new AtomicReference<AMQMessage>(message);
if (message != null)
{
_messageId = message.getMessageId();
@@ -136,11 +137,7 @@ public class QueueEntryImpl implements QueueEntry
public AMQMessage getMessage()
{
- if (_message == null)
- {
- return _backingStore.load(_messageId);
- }
- return _message;
+ return load();
}
public Long getMessageId()
@@ -231,13 +228,17 @@ public class QueueEntryImpl implements QueueEntry
public String debugIdentity()
{
String entry = "[State:" + _state.getState().name() + "]";
- if (_message == null)
+
+ AMQMessage message = _messageRef.get();
+
+ if (message == null)
{
return entry + "(Message Unloaded ID:" + _messageId + ")";
}
else
{
- return entry + _message.debugIdentity();
+
+ return entry + message.debugIdentity();
}
}
@@ -398,23 +399,27 @@ public class QueueEntryImpl implements QueueEntry
public void unload()
{
- if (_message != null && _backingStore != null)
- {
+ //Get the currently loaded message
+ AMQMessage message = _messageRef.get();
+ // If we have a message in memory and we have a valid backingStore attempt to unload
+ if (message != null && _backingStore != null)
+ {
try
{
- if (!_hasBeenUnloaded)
+ // The backingStore will now handle concurrent calls to unload and safely synchronize to ensure
+ // multiple initial unloads are unloads
+ _backingStore.unload(message);
+ _hasBeenUnloaded = true;
+ _messageRef.set(null);
+
+ if (_log.isDebugEnabled())
{
- _hasBeenUnloaded = true;
+ _log.debug("Unloaded:" + debugIdentity());
+ }
- _backingStore.unload(_message);
- if (_log.isDebugEnabled())
- {
- _log.debug("Unloaded:" + debugIdentity());
- }
- }
- _message = null;
+ // Clear the message reference if the loaded message is still the one we are processing.
//Update the memoryState if this load call resulted in the message being purged from memory
if (!_flowed.getAndSet(true))
@@ -434,23 +439,56 @@ public class QueueEntryImpl implements QueueEntry
}
}
- public void load()
+ public AMQMessage load()
{
+ // MessageId and Backing store are null in test scenarios, normally this is not the case.
if (_messageId != null && _backingStore != null)
{
- _message = _backingStore.load(_messageId);
-
- if (_log.isDebugEnabled())
+ // See if we have the message currently in memory to return
+ AMQMessage message = _messageRef.get();
+ // if we don't then we need to start a load process.
+ if (message == null)
{
- _log.debug("Loaded:" + debugIdentity());
- }
+ //Synchronize here to ensure only the first thread that attempts to load will perform the load from the
+ // backing store.
+ synchronized (this)
+ {
+ // Check again to see if someone else ahead of us loaded the message
+ message = _messageRef.get();
+ // if we still don't have the message then we need to start a load process.
+ if (message == null)
+ {
+ // Load the message and keep a reference to it
+ message = _backingStore.load(_messageId);
+ // Set the message reference
+ _messageRef.set(message);
+ }
+ else
+ {
+ // If someone else loaded the message then we can jump out here as the Memory Updates will
+ // have been performed by the loading thread
+ return message;
+ }
+ }
- //Update the memoryState if this load call resulted in the message comming in to memory
- if (_flowed.getAndSet(false))
- {
- _queueEntryList.entryLoadedUpdateMemory(this);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Loaded:" + debugIdentity());
+ }
+
+ //Update the memoryState if this load call resulted in the message comming in to memory
+ if (_flowed.getAndSet(false))
+ {
+ _queueEntryList.entryLoadedUpdateMemory(this);
+ }
}
+
+ // Return the message that was either already in memory or the value we just loaded.
+ return message;
}
+ // This can be null but only in the case where we have no messageId
+ // in the case where we have no backingStore then we will never have unloaded the message
+ return _messageRef.get();
}
public boolean isFlowed()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 3ab127b59d..c370fd9867 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -32,6 +32,7 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.subscription.Subscription;
import org.apache.qpid.server.subscription.MockSubscription;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.AMQException;
import java.util.Map;
@@ -96,6 +97,12 @@ public class ExtractResendAndRequeueTest extends TestCase
assertEquals("Map does not contain correct setup data", INITIAL_MSG_COUNT, _unacknowledgedMessageMap.size());
}
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockAMQQueue will create
+ ApplicationRegistry.getInstance().close();
+ }
+
/**
* Helper method to create a new subscription and aquire the given messages.
*
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index 22e49a0241..ee1796ba2f 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -36,11 +36,9 @@ import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.FailedDequeueException;
import org.apache.qpid.server.queue.IncomingMessage;
-import org.apache.qpid.server.queue.MessageCleanupException;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.queue.SimpleAMQQueue;
-import org.apache.qpid.server.queue.UnableToFlowMessageException;
import org.apache.qpid.server.registry.ApplicationRegistry;
import org.apache.qpid.server.store.MemoryMessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
@@ -356,9 +354,9 @@ public class AbstractHeadersExchangeTestBase extends TestCase
//To change body of implemented methods use File | Settings | File Templates.
}
- public void load()
+ public AMQMessage load()
{
- //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public boolean isFlowed()
diff --git a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
index 9344efd4a8..e7b3f40393 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java
@@ -23,10 +23,18 @@ package org.apache.qpid.server.filter;
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.server.queue.MockQueueEntry;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class PropertyExpressionTest extends TestCase
{
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockQueueEntry will create
+ ApplicationRegistry.remove(1);
+ }
+
+
public void testJMSRedelivered()
{
PropertyExpression<AMQException> pe = new PropertyExpression<AMQException>("JMSRedelivered");
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
index b38da53406..11049a7ae3 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQMessage.java
@@ -23,6 +23,14 @@ package org.apache.qpid.server.queue;
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
+
+import java.util.LinkedList;
+import java.util.ArrayList;
public class MockAMQMessage extends TransientAMQMessage
{
@@ -31,6 +39,14 @@ public class MockAMQMessage extends TransientAMQMessage
{
super(messageId);
_messagePublishInfo = new MessagePublishInfoImpl(null,false,false,null);
+ BasicContentHeaderProperties properties = new BasicContentHeaderProperties();
+
+ properties.setMessageId(String.valueOf(messageId));
+ properties.setTimestamp(System.currentTimeMillis());
+ properties.setDeliveryMode((byte)1);
+
+ _contentHeaderBody = new ContentHeaderBody(properties, BasicPublishBodyImpl.CLASS_ID);
+ _contentBodies = new ArrayList<ContentChunk>();
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
index d9e4cc9b70..ff814840bc 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/MockAMQQueue.java
@@ -20,16 +20,18 @@
*/
package org.apache.qpid.server.queue;
+import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.configuration.QueueConfiguration;
+import org.apache.qpid.server.configuration.ServerConfiguration;
+import org.apache.qpid.server.configuration.VirtualHostConfiguration;
import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.server.subscription.Subscription;
-import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.management.ManagedObject;
-import org.apache.qpid.AMQException;
-import org.apache.commons.configuration.Configuration;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.server.subscription.Subscription;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.registry.ApplicationRegistry;
import java.util.List;
import java.util.Set;
@@ -39,10 +41,20 @@ public class MockAMQQueue implements AMQQueue
private boolean _deleted = false;
private int _queueCount;
private AMQShortString _name;
+ private VirtualHost _virtualhost;
public MockAMQQueue(String name)
{
- _name = new AMQShortString(name);
+ _name = new AMQShortString(name);
+ _virtualhost = ApplicationRegistry.getInstance().getVirtualHostRegistry().getVirtualHost("test");
+ try
+ {
+ _virtualhost.getQueueRegistry().registerQueue(this);
+ }
+ catch (AMQException e)
+ {
+ e.printStackTrace();
+ }
}
public AMQShortString getName()
@@ -67,7 +79,7 @@ public class MockAMQQueue implements AMQQueue
public VirtualHost getVirtualHost()
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return _virtualhost;
}
public void bind(Exchange exchange, AMQShortString routingKey, FieldTable arguments) throws AMQException
@@ -152,7 +164,7 @@ public class MockAMQQueue implements AMQQueue
public int delete() throws AMQException
{
- return 0; //To change body of implemented methods use File | Settings | File Templates.
+ return 0; //To change body of implemented methods use File | Settings | File Templates.
}
public QueueEntry enqueue(StoreContext storeContext, AMQMessage message) throws AMQException
@@ -343,7 +355,7 @@ public class MockAMQQueue implements AMQQueue
public void setMinimumAlertRepeatGap(long value)
{
-
+
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
index 6c6835ccca..abcd9855d9 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/security/access/ACLManagerTest.java
@@ -40,6 +40,7 @@ import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MockAMQQueue;
import org.apache.qpid.server.queue.MockProtocolSession;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
+import org.apache.qpid.server.registry.ApplicationRegistry;
public class ACLManagerTest extends TestCase
{
@@ -67,6 +68,12 @@ public class ACLManagerTest extends TestCase
_session = new MockProtocolSession(new TestableMemoryMessageStore());
}
+
+ public void tearDown() throws Exception
+ {
+ //Ensure we close the registry that the MockAMQQueue will create
+ ApplicationRegistry.getInstance().close();
+ }
public void testACLManagerConfigurationPluginManager() throws Exception
{