diff options
Diffstat (limited to 'java/broker')
15 files changed, 77 insertions, 64 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 9b29c7b921..72a2780c32 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -431,7 +431,7 @@ public class AMQChannel { if (_log.isDebugEnabled()) { - _log.debug(debugIdentity() + " Adding unacked message(" + entry.getMessage().toString() + " DT:" + deliveryTag + _log.debug(debugIdentity() + " Adding unacked message(" + entry.toString() + " DT:" + deliveryTag + ") with a queue(" + entry.getQueue() + ") for " + subscription); } } @@ -551,7 +551,7 @@ public class AMQChannel } else { - _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.getMessage().debugIdentity() + _log.warn(System.identityHashCode(this) + " Requested requeue of message(" + unacked.debugIdentity() + "):" + deliveryTag + " but no queue defined and no DeadLetter queue so DROPPING message."); unacked.dequeueAndDelete(_storeContext); diff --git a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java index 918fcd8407..95de0dc8c3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ack/TxAck.java @@ -102,7 +102,7 @@ public class TxAck implements TxnOp //buffer must be marked as persistent: for (QueueEntry msg : _unacked.values()) { - if (msg.getMessage().isPersistent()) + if (msg.isPersistent()) { return true; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java index bd70cd7776..8b04315d33 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRejectMethodHandler.java @@ -96,7 +96,7 @@ public class BasicRejectMethodHandler implements StateAwareMethodListener<BasicR if (_logger.isDebugEnabled()) { - _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.getMessage().debugIdentity() + + _logger.debug("Rejecting: DT:" + deliveryTag + "-" + queueEntry.debugIdentity() + ": Requeue:" + body.getRequeue() + //": Resend:" + evt.getMethod().resend + " on channel:" + channel.debugIdentity()); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 36da1c4cb1..2ff54fb748 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -274,8 +274,9 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que /** * Checks if there is any notification to be send to the listeners + * @param queueEntry */ - public void checkForNotification(AMQMessage msg) throws AMQException + public void checkForNotification(QueueEntry queueEntry) throws AMQException { final Set<NotificationCheck> notificationChecks = _queue.getNotificationChecks(); @@ -289,7 +290,7 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que { if (check.isMessageSpecific() || (_lastNotificationTimes[check.ordinal()] < thresholdTime)) { - if (check.notifyIfNecessary(msg, _queue, this)) + if (check.notifyIfNecessary(queueEntry, _queue, this)) { _lastNotificationTimes[check.ordinal()] = currentTime; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java index e33b0c83c7..a83d661de2 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/NotificationCheck.java @@ -20,14 +20,12 @@ */
package org.apache.qpid.server.queue;
-import org.apache.qpid.AMQException;
-
public enum NotificationCheck
{
MESSAGE_COUNT_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
int msgCount;
final long maximumMessageCount = queue.getMaximumMessageCount();
@@ -41,19 +39,19 @@ public enum NotificationCheck },
MESSAGE_SIZE_ALERT(true)
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maximumMessageSize = queue.getMaximumMessageSize();
if(maximumMessageSize != 0)
{
// Check for threshold message size
- long messageSize = (msg == null) ? 0 : msg.getContentHeaderBody().bodySize;
+ long messageSize = (queueEntry == null) ? 0 : queueEntry.getSize();
if (messageSize >= maximumMessageSize)
{
listener.notifyClients(this, queue, messageSize + "b : Maximum message size threshold (" +
maximumMessageSize + ") breached. [Message ID=" +
- (msg == null ? "null" : msg.getMessageId()) + "]");
+ (queueEntry == null ? "null" : queueEntry.getMessageId()) + "]");
return true;
}
}
@@ -63,7 +61,7 @@ public enum NotificationCheck },
QUEUE_DEPTH_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
// Check for threshold queue depth in bytes
final long maximumQueueDepth = queue.getMaximumQueueDepth();
@@ -84,7 +82,7 @@ public enum NotificationCheck },
MESSAGE_AGE_ALERT
{
- boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener)
+ boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener)
{
final long maxMessageAge = queue.getMaximumMessageAge();
@@ -126,6 +124,6 @@ public enum NotificationCheck return _messageSpecific;
}
- abstract boolean notifyIfNecessary(AMQMessage msg, AMQQueue queue, QueueNotificationListener listener);
+ abstract boolean notifyIfNecessary(QueueEntry queueEntry, AMQQueue queue, QueueNotificationListener listener);
}
diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2cb1493a8f..7fc5df4e9e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -135,6 +135,8 @@ public interface QueueEntry extends Comparable<QueueEntry>, Filterable<AMQExcept AMQMessage getMessage(); + Long getMessageId(); + long getSize(); /** diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index 5114529419..fceaf75a9e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -83,6 +83,7 @@ public class QueueEntryImpl implements QueueEntry private long _expiration; private static final byte IMMEDIATE_AND_DELIVERED = (byte) (IMMEDIATE | DELIVERED_TO_CONSUMER); + private boolean _persistent; QueueEntryImpl(SimpleQueueEntryList queueEntryList) { @@ -111,6 +112,7 @@ public class QueueEntryImpl implements QueueEntry _flags |= IMMEDIATE; } _expiration = message.getExpiration(); + _persistent = message.isPersistent(); } _backingStore = queueEntryList.getBackingStore(); _flowed = new AtomicBoolean(false); @@ -140,6 +142,11 @@ public class QueueEntryImpl implements QueueEntry return _message; } + public Long getMessageId() + { + return _messageId; + } + public long getSize() { return _messageSize; @@ -245,12 +252,12 @@ public class QueueEntryImpl implements QueueEntry public ContentHeaderBody getContentHeaderBody() throws AMQException { - return _message.getContentHeaderBody(); + return getMessage().getContentHeaderBody(); } public boolean isPersistent() throws AMQException { - return _message.isPersistent(); + return _persistent; } public boolean isRedelivered() diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java index 9e9895c53b..63ec56c1af 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/SimpleAMQQueue.java @@ -417,7 +417,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener deliverAsync(); } - _managedObject.checkForNotification(entry.getMessage()); + _managedObject.checkForNotification(entry); return entry; } @@ -567,10 +567,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener try { - AMQMessage msg = entry.getMessage(); - if (msg.isPersistent()) + if (entry.isPersistent()) { - _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, msg.getMessageId()); + _virtualHost.getTransactionLog().dequeueMessage(storeContext, this, entry.getMessageId()); } } @@ -761,7 +760,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); return messageId >= fromMessageId && messageId <= toMessageId; } @@ -780,7 +779,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - _complete = entry.getMessage().getMessageId() == messageId; + _complete = entry.getMessageId() == messageId; return _complete; } @@ -829,7 +828,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); return (messageId >= fromMessageId) && (messageId <= toMessageId) && entry.acquire(); @@ -848,11 +847,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in the transaction log. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); - - if (message.isPersistent() && toQueue.isDurable()) + if (entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); } // dequeue will remove the messages from the queue entry.dequeue(storeContext); @@ -887,6 +884,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener for (QueueEntry entry : entries) { toQueue.enqueue(storeContext, entry.getMessage()); + // As we only did a dequeue above now that we have moved the message we should perform a delete. + // We cannot do this earlier as the message will be lost if flowed. + //entry.delete(); } } catch (MessageCleanupException e) @@ -913,7 +913,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener public boolean accept(QueueEntry entry) { - final long messageId = entry.getMessage().getMessageId(); + final long messageId = entry.getMessageId(); if ((messageId >= fromMessageId) && (messageId <= toMessageId)) { @@ -939,11 +939,9 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener // Move the messages in on the transaction log. for (QueueEntry entry : entries) { - AMQMessage message = entry.getMessage(); - - if (!entry.isDeleted() && message.isPersistent() && toQueue.isDurable()) + if (!entry.isDeleted() && entry.isPersistent() && toQueue.isDurable()) { - transactionLog.enqueueMessage(storeContext, toQueue, message.getMessageId()); + transactionLog.enqueueMessage(storeContext, toQueue, entry.getMessageId()); } } @@ -1002,7 +1000,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener { QueueEntry node = queueListIterator.getNode(); - final long messageId = node.getMessage().getMessageId(); + final long messageId = node.getMessageId(); if ((messageId >= fromMessageId) && (messageId <= toMessageId) @@ -1438,7 +1436,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener } else { - _managedObject.checkForNotification(node.getMessage()); + _managedObject.checkForNotification(node); } } @@ -1605,7 +1603,7 @@ public class SimpleAMQQueue implements AMQQueue, Subscription.StateListener for (int i = 0; i < num && !it.atTail(); i++) { it.advance(); - ids.add(it.getNode().getMessage().getMessageId()); + ids.add(it.getNode().getMessageId()); } return ids; } diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index d3e69fc1fa..bc1f56fee1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -387,6 +387,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //todo - client id should be recoreded and this test removed but handled below if (_noLocal) { + //todo getPublisherClientInstance should be moved to QueueEntryImpl final Object publisherId = entry.getMessage().getPublisherClientInstance(); // We don't want local messages so check to see if message is one we sent @@ -407,6 +408,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage //todo - client id should be recoreded and this test removed but handled here + //todo getPublisherIdentifier should be moved to QueueEntryImpl if (localInstance != null && localInstance.equals(entry.getMessage().getPublisherIdentifier())) { return false; diff --git a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java index 0ca8135f71..2f27e1405a 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java +++ b/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java @@ -127,9 +127,9 @@ public class NonTransactionalContext implements TransactionalContext { if (debug) { - _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessageId()); } - if(queueEntry.getMessage().isPersistent()) + if(queueEntry.isPersistent()) { beginTranIfNecessary(); } @@ -175,9 +175,9 @@ public class NonTransactionalContext implements TransactionalContext if (debug) { - _log.debug("Discarding message: " + queueEntry.getMessage().getMessageId()); + _log.debug("Discarding message: " + queueEntry.getMessageId()); } - if(queueEntry.getMessage().isPersistent()) + if(queueEntry.isPersistent()) { beginTranIfNecessary(); } diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java index a8dd58ca83..7fe16062fc 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Move.java @@ -172,7 +172,7 @@ public class Move extends AbstractCommand { for (QueueEntry msg : messages) { - ids.add(msg.getMessage().getMessageId()); + ids.add(msg.getMessageId()); } } } diff --git a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java index 2a97db6066..3ab127b59d 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/ExtractResendAndRequeueTest.java @@ -87,7 +87,7 @@ public class ExtractResendAndRequeueTest extends TestCase while(queueEntries.advance()) { QueueEntry entry = queueEntries.getNode(); - _unacknowledgedMessageMap.add(entry.getMessage().getMessageId(), entry); + _unacknowledgedMessageMap.add(entry.getMessageId(), entry); // Store the entry for future inspection _referenceList.add(entry); diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index c50770d7ba..4c9de73a0b 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -215,6 +215,11 @@ public class AbstractHeadersExchangeTestBase extends TestCase return null; //To change body of implemented methods use File | Settings | File Templates. } + public Long getMessageId() + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + public long getSize() { return 0; //To change body of implemented methods use File | Settings | File Templates. diff --git a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java index f8544a33bd..890b641540 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/exchange/DestWildExchangeTest.java @@ -100,7 +100,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -140,7 +140,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -159,7 +159,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -198,7 +198,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -217,7 +217,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -236,7 +236,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -254,7 +254,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -294,7 +294,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -312,7 +312,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -352,7 +352,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -384,7 +384,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -425,7 +425,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); @@ -464,7 +464,7 @@ public class DestWildExchangeTest extends TestCase Assert.assertEquals(1, queue.getMessageCount()); - Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessage().getMessageId()); + Assert.assertEquals("Wrong message recevied", (Object) message.getMessageId(), queue.getMessagesOnTheQueue().get(0).getMessageId()); queue.deleteMessageFromTop(_context); Assert.assertEquals(0, queue.getMessageCount()); diff --git a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java index 800bb1ac9c..d7844730d1 100644 --- a/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java +++ b/java/broker/src/test/java/org/apache/qpid/server/queue/AMQPriorityQueueTest.java @@ -68,17 +68,17 @@ public class AMQPriorityQueueTest extends SimpleAMQQueueTest ArrayList<QueueEntry> msgs = _subscription.getQueueEntries(); try { - assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessage().getMessageId()); - assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessage().getMessageId()); - assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessage().getMessageId()); + assertEquals(new Long(1 + messagIDOffset), msgs.get(0).getMessageId()); + assertEquals(new Long(6 + messagIDOffset), msgs.get(1).getMessageId()); + assertEquals(new Long(8 + messagIDOffset), msgs.get(2).getMessageId()); - assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessage().getMessageId()); - assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessage().getMessageId()); - assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessage().getMessageId()); + assertEquals(new Long(2 + messagIDOffset), msgs.get(3).getMessageId()); + assertEquals(new Long(5 + messagIDOffset), msgs.get(4).getMessageId()); + assertEquals(new Long(7 + messagIDOffset), msgs.get(5).getMessageId()); - assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessage().getMessageId()); - assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessage().getMessageId()); - assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessage().getMessageId()); + assertEquals(new Long(3 + messagIDOffset), msgs.get(6).getMessageId()); + assertEquals(new Long(4 + messagIDOffset), msgs.get(7).getMessageId()); + assertEquals(new Long(9 + messagIDOffset), msgs.get(8).getMessageId()); } catch (AssertionFailedError afe) { |
