From 2bb0715ab45f92afcc9d037ed7c5c3da8c818550 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 6 Feb 2009 17:07:46 +0000 Subject: QPID-1628 : Moved Redelivered from AMQMessage to QueueEntry Added PropertyExpressionTest to test Redelivered Property git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@741634 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 25 ++++---- .../qpid/server/ExtractResendAndRequeue.java | 25 ++++---- .../qpid/server/filter/PropertyExpression.java | 17 +++--- .../qpid/server/handler/BasicGetMethodHandler.java | 2 +- .../server/output/ProtocolOutputConverter.java | 5 +- .../amqp0_8/ProtocolOutputConverterImpl.java | 24 +++++--- .../amqp0_9/ProtocolOutputConverterImpl.java | 69 +++++++++++----------- .../org/apache/qpid/server/queue/AMQMessage.java | 12 +--- .../apache/qpid/server/queue/AMQMessageHandle.java | 4 -- .../apache/qpid/server/queue/AMQQueueMBean.java | 6 +- .../qpid/server/queue/InMemoryMessageHandle.java | 11 ---- .../qpid/server/queue/MessageHandleFactory.java | 3 + .../org/apache/qpid/server/queue/QueueEntry.java | 5 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 23 +++++++- .../qpid/server/subscription/SubscriptionImpl.java | 2 +- .../qpid/tools/messagestore/commands/Show.java | 2 +- 16 files changed, 120 insertions(+), 115 deletions(-) (limited to 'java/broker/src/main') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 26ac562fb2..5fde08cbdd 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -491,7 +491,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery unacked.release(); @@ -522,7 +522,7 @@ public class AMQChannel if (unacked != null) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) @@ -611,13 +611,10 @@ public class AMQChannel for (Map.Entry entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + QueueEntry queueEntry = entry.getValue(); long deliveryTag = entry.getKey(); - - - AMQMessage msg = message.getMessage(); - AMQQueue queue = message.getQueue(); + AMQQueue queue = queueEntry.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -635,16 +632,16 @@ public class AMQChannel // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + queueEntry.setRedelivered(true); - Subscription sub = message.getDeliveredSubscription(); + Subscription sub = queueEntry.getDeliveredSubscription(); if (sub != null) { - if(!queue.resend(message, sub)) + if(!queue.resend(queueEntry, sub)) { - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -652,11 +649,11 @@ public class AMQChannel if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString() + ")to prevent loss"); } // move this message to requeue - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } // for all messages // } else !isSuspend @@ -888,7 +885,7 @@ public class AMQChannel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); } }; diff --git a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 29494c4118..097ac27399 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -54,22 +54,21 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor _storeContext = storeContext; } - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { - - AMQMessage msg = message.getMessage(); - msg.setRedelivered(true); - final Subscription subscription = message.getDeliveredSubscription(); + + queueEntry.setRedelivered(true); + final Subscription subscription = queueEntry.getDeliveredSubscription(); if (subscription != null) { // Consumer exists if (!subscription.isClosed()) { - _msgToResend.put(deliveryTag, message); + _msgToResend.put(deliveryTag, queueEntry); } else // consumer has gone { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -77,22 +76,22 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered // cannot resend, so re-queue. - if (!message.isQueueDeleted()) + if (!queueEntry.isQueueDeleted()) { if (_requeueIfUnabletoResend) { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } else { - message.discard(_storeContext); - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - message.discard(_storeContext); - _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index b30c70dac3..fa276169bf 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -71,13 +71,7 @@ public class PropertyExpression implements Expression JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression()); - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() - { - public Object evaluate(Filterable message) throws E - { - return message.isRedelivered(); - } - }); + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new RedeliveredExpression()); } private final String name; @@ -265,4 +259,13 @@ public class PropertyExpression implements Expression } } + + private static class RedeliveredExpression implements Expression + { + public Object evaluate(Filterable message) throws E + { + return message.isRedelivered(); + } + } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 001b7858ec..0f492a21bb 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -130,7 +130,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener +public class AMQMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -396,16 +396,6 @@ public class AMQMessage implements Filterable return _messageHandle.getMessagePublishInfo(getStoreContext()); } - public boolean isRedelivered() - { - return _messageHandle.isRedelivered(); - } - - public void setRedelivered(boolean redelivered) - { - _messageHandle.setRedelivered(redelivered); - } - public long getArrivalTime() { return _messageHandle.getArrivalTime(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 0ddd4e4d92..93ac21fc7c 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -63,10 +63,6 @@ public interface AMQMessageHandle MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException; - boolean isRedelivered(); - - void setRedelivered(boolean redelivered); - boolean isPersistent(); void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 2ed6be77c6..6f478dffd7 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -397,11 +397,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create the tabular list of message header contents for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - AMQMessage msg = list.get(i - 1).getMessage(); + QueueEntry queueEntry = list.get(i - 1); + AMQMessage msg = queueEntry.getMessage(); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() }; + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, + queueEntry.isRedelivered() }; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 35ad5be4e0..2a7c90a81e 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -109,17 +109,6 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _messagePublishInfo; } - public boolean isRedelivered() - { - return _redelivered; - } - - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - public boolean isPersistent() { return false; diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java index 0b214ca336..7573a629c1 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -42,5 +42,8 @@ public class MessageHandleFactory { return new InMemoryMessageHandle(messageId); } + +// return new AMQMessage(messageId, store, persistent); } + } diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2657c459a9..0df976a620 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -24,11 +24,8 @@ import org.apache.qpid.server.subscription.Subscription; * under the License. * */ -public interface QueueEntry extends Comparable +public interface QueueEntry extends Comparable, Filterable { - - - public static enum State { AVAILABLE, diff --git a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..fe9686e906 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.log4j.Logger; @@ -44,6 +45,7 @@ public class QueueEntryImpl implements QueueEntry private AMQMessage _message; + private boolean _redelivered; private Set _rejectedBy = null; @@ -186,9 +188,26 @@ public class QueueEntryImpl implements QueueEntry return _message.immediateAndNotDelivered(); } - public void setRedelivered(boolean b) + public ContentHeaderBody getContentHeaderBody() throws AMQException { - getMessage().setRedelivered(b); + return _message.getContentHeaderBody(); + } + + public boolean isPersistent() throws AMQException + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + // todo - here we could mark this message as redelivered so we don't have to mark + // all messages on recover as redelivered. } public Subscription getDeliveredSubscription() diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index a616c2ea35..be11eb7b84 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -434,7 +434,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg.getMessage()); + return (_filters == null) || _filters.allAllow(msg); } public boolean isAutoClose() diff --git a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 2fa017fc64..b5a91c8da6 100644 --- a/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -358,7 +358,7 @@ public class Show extends AbstractCommand ispersitent.add("n/a"); } - isredelivered.add(msg.isRedelivered() ? "true" : "false"); + isredelivered.add(entry.isRedelivered() ? "true" : "false"); isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); -- cgit v1.2.1