summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java5
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java16
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java11
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java36
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java2
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java2
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java5
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java26
-rw-r--r--java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java18
15 files changed, 77 insertions, 64 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 9b29c7b921..72a2780c32 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -431,7 +431,7 @@ public class AMQChannel
{
if (_log.isDebugEnabled())
{
- _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag
+ _log.debug(debugIdentity() + " Adding unacked message(" + entry.toString() + " DT:" + deliveryTag
+ ") with a queue(" + entry.getQueue() + ") for " + subscription);
}
}
@@ -551,7 +551,7 @@ public class AMQChannel
}
else
{
- _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity()
+ _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.debugIdentity()
+ "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message.");
unacked.dequeueAndDelete(_storeContext);
diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
index 918fcd8407..95de0dc8c3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java
@@ -102,7 +102,7 @@ public class TxAck implements TxnOp
//buffer must be marked as persistent:
for (QueueEntry msg : _unacked.values())
{
- if (msg.getMessage().isPersistent())
+ if (msg.isPersistent())
{
return true;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
index bd70cd7776..8b04315d33 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java
@@ -96,7 +96,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR
if (_logger.isDebugEnabled())
{
- _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() +
+ _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.debugIdentity() +
": Requeue:" + body.getRequeue() +
//": Resend:" + evt.getMethod().resend +
" on channel:" + channel.debugIdentity());
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
index 36da1c4cb1..2ff54fb748 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
@@ -274,8 +274,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
/**
* Checks if there is any notification to be send to the listeners
+ * @param queueEntry
*/
- public void checkForNotification(AMQMessage msg) throws AMQException
+ public void checkForNotification(QueueEntry queueEntry) throws AMQException
{
final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks();
@@ -289,7 +290,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que
{
if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime))
{
- if (check.notifyIfNecessary(msg, _queue, this))
+ if (check.notifyIfNecessary(queueEntry, _queue, this))
{
_lastNotificationTimes[check.ordinal()] = currentTime;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
index e33b0c83c7..a83d661de2 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,19 +39,19 @@ public enum NotificationCheck
},
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ long messageSize = (queueEntry == null) ? 0 : queueEntry.getSize();
if (messageSize >= maximumMessageSize)
{
listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
maximumMessageSize + ") breached. [Message ID=" +
- (msg == null ? "null" : msg.getMessageId()) + "]");
+ (queueEntry == null ? "null" : queueEntry.getMessageId()) + "]");
return true;
}
}
@@ -63,7 +61,7 @@ public enum NotificationCheck
},
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -84,7 +82,7 @@ public enum NotificationCheck
},
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -126,6 +124,6 @@ public enum NotificationCheck
return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
index 2cb1493a8f..7fc5df4e9e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java
@@ -135,6 +135,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept
AMQMessage getMessage();
+ Long getMessageId();
+
long getSize();
/**
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
index 5114529419..fceaf75a9e 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java
@@ -83,6 +83,7 @@ public class QueueEntryImpl implements QueueEntry
private long _expiration;
private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER);
+ private boolean _persistent;
QueueEntryImpl(SimpleQueueEntryList queueEntryList)
{
@@ -111,6 +112,7 @@ public class QueueEntryImpl implements QueueEntry
_flags |= IMMEDIATE;
}
_expiration = message.getExpiration();
+ _persistent = message.isPersistent();
}
_backingStore = queueEntryList.getBackingStore();
_flowed = new AtomicBoolean(false);
@@ -140,6 +142,11 @@ public class QueueEntryImpl implements QueueEntry
return _message;
}
+ public Long getMessageId()
+ {
+ return _messageId;
+ }
+
public long getSize()
{
return _messageSize;
@@ -245,12 +252,12 @@ public class QueueEntryImpl implements QueueEntry
public ContentHeaderBody getContentHeaderBody() throws AMQException
{
- return _message.getContentHeaderBody();
+ return getMessage().getContentHeaderBody();
}
public boolean isPersistent() throws AMQException
{
- return _message.isPersistent();
+ return _persistent;
}
public boolean isRedelivered()
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
index 9e9895c53b..63ec56c1af 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java
@@ -417,7 +417,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
deliverAsync();
}
- _managedObject.checkForNotification(entry.getMessage());
+ _managedObject.checkForNotification(entry);
return entry;
}
@@ -567,10 +567,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
try
{
- AMQMessage msg = entry.getMessage();
- if (msg.isPersistent())
+ if (entry.isPersistent())
{
- _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId());
+ _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId());
}
}
@@ -761,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return messageId >= fromMessageId && messageId <= toMessageId;
}
@@ -780,7 +779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- _complete = entry.getMessage().getMessageId() == messageId;
+ _complete = entry.getMessageId() == messageId;
return _complete;
}
@@ -829,7 +828,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
return (messageId >= fromMessageId)
&& (messageId <= toMessageId)
&& entry.acquire();
@@ -848,11 +847,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (message.isPersistent() && toQueue.isDurable())
+ if (entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
}
// dequeue will remove the messages from the queue
entry.dequeue(storeContext);
@@ -887,6 +884,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
for (QueueEntry entry : entries)
{
toQueue.enqueue(storeContext, entry.getMessage());
+ // As we only did a dequeue above now that we have moved the message we should perform a delete.
+ // We cannot do this earlier as the message will be lost if flowed.
+ //entry.delete();
}
}
catch (MessageCleanupException e)
@@ -913,7 +913,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
public boolean accept(QueueEntry entry)
{
- final long messageId = entry.getMessage().getMessageId();
+ final long messageId = entry.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId))
{
@@ -939,11 +939,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
// Move the messages in on the transaction log.
for (QueueEntry entry : entries)
{
- AMQMessage message = entry.getMessage();
-
- if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable())
+ if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable())
{
- transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId());
+ transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId());
}
}
@@ -1002,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
{
QueueEntry node = queueListIterator.getNode();
- final long messageId = node.getMessage().getMessageId();
+ final long messageId = node.getMessageId();
if ((messageId >= fromMessageId)
&& (messageId <= toMessageId)
@@ -1438,7 +1436,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
}
else
{
- _managedObject.checkForNotification(node.getMessage());
+ _managedObject.checkForNotification(node);
}
}
@@ -1605,7 +1603,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener
for (int i = 0; i < num && !it.atTail(); i++)
{
it.advance();
- ids.add(it.getNode().getMessage().getMessageId());
+ ids.add(it.getNode().getMessageId());
}
return ids;
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
index d3e69fc1fa..bc1f56fee1 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java
@@ -387,6 +387,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
//todo - client id should be recoreded and this test removed but handled below
if (_noLocal)
{
+ //todo getPublisherClientInstance should be moved to QueueEntryImpl
final Object publisherId = entry.getMessage().getPublisherClientInstance();
// We don't want local messages so check to see if message is one we sent
@@ -407,6 +408,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage
//todo - client id should be recoreded and this test removed but handled here
+ //todo getPublisherIdentifier should be moved to QueueEntryImpl
if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier()))
{
return false;
diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
index 0ca8135f71..2f27e1405a 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
@@ -127,9 +127,9 @@ public class NonTransactionalContext implements TransactionalContext
{
if (debug)
{
- _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessageId());
}
- if(queueEntry.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
@@ -175,9 +175,9 @@ public class NonTransactionalContext implements TransactionalContext
if (debug)
{
- _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId());
+ _log.debug("Discarding message: " + queueEntry.getMessageId());
}
- if(queueEntry.getMessage().isPersistent())
+ if(queueEntry.isPersistent())
{
beginTranIfNecessary();
}
diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
index a8dd58ca83..7fe16062fc 100644
--- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
+++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java
@@ -172,7 +172,7 @@ public class Move extends AbstractCommand
{
for (QueueEntry msg : messages)
{
- ids.add(msg.getMessage().getMessageId());
+ ids.add(msg.getMessageId());
}
}
}
diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
index 2a97db6066..3ab127b59d 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java
@@ -87,7 +87,7 @@ public class ExtractResendAndRequeueTest extends TestCase
while(queueEntries.advance())
{
QueueEntry entry = queueEntries.getNode();
- _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry);
+ _unacknowledgedMessageMap.add(entry.getMessageId(), entry);
// Store the entry for future inspection
_referenceList.add(entry);
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
index c50770d7ba..4c9de73a0b 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
@@ -215,6 +215,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase
return null; //To change body of implemented methods use File | Settings | File Templates.
}
+ public Long getMessageId()
+ {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public long getSize()
{
return 0; //To change body of implemented methods use File | Settings | File Templates.
diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
index f8544a33bd..890b641540 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java
@@ -100,7 +100,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -140,7 +140,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -159,7 +159,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -198,7 +198,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -217,7 +217,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -236,7 +236,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -254,7 +254,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -294,7 +294,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -312,7 +312,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -352,7 +352,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -384,7 +384,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -425,7 +425,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
@@ -464,7 +464,7 @@ public class DestWildExchangeTest extends TestCase
Assert.assertEquals(1, queue.getMessageCount());
- Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId());
+ Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId());
queue.deleteMessageFromTop(_context);
Assert.assertEquals(0, queue.getMessageCount());
diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
index 800bb1ac9c..d7844730d1 100644
--- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
+++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java
@@ -68,17 +68,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest
ArrayList<QueueEntry> msgs = _subscription.getQueueEntries();
try
{
- assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId());
- assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId());
- assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId());
+ assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessageId());
+ assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessageId());
+ assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessageId());
- assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId());
- assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId());
- assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId());
+ assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessageId());
+ assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessageId());
+ assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessageId());
- assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId());
- assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId());
- assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId());
+ assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessageId());
+ assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessageId());
+ assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessageId());
}
catch (AssertionFailedError afe)
{