From 5e264eb11619b5088776a36546acd06415299314 Mon Sep 17 00:00:00 2001 From: Martin Ritchie Date: Fri, 6 Feb 2009 17:07:46 +0000 Subject: QPID-1628 : Moved Redelivered from AMQMessage to QueueEntry Added PropertyExpressionTest to test Redelivered Property git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@741634 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 25 ++++---- .../qpid/server/ExtractResendAndRequeue.java | 25 ++++---- .../qpid/server/filter/PropertyExpression.java | 17 +++--- .../qpid/server/handler/BasicGetMethodHandler.java | 2 +- .../server/output/ProtocolOutputConverter.java | 5 +- .../amqp0_8/ProtocolOutputConverterImpl.java | 24 +++++--- .../amqp0_9/ProtocolOutputConverterImpl.java | 69 +++++++++++----------- .../org/apache/qpid/server/queue/AMQMessage.java | 12 +--- .../apache/qpid/server/queue/AMQMessageHandle.java | 4 -- .../apache/qpid/server/queue/AMQQueueMBean.java | 6 +- .../qpid/server/queue/InMemoryMessageHandle.java | 11 ---- .../qpid/server/queue/MessageHandleFactory.java | 3 + .../org/apache/qpid/server/queue/QueueEntry.java | 5 +- .../apache/qpid/server/queue/QueueEntryImpl.java | 23 +++++++- .../qpid/server/subscription/SubscriptionImpl.java | 2 +- .../qpid/tools/messagestore/commands/Show.java | 2 +- .../exchange/AbstractHeadersExchangeTestBase.java | 15 +++++ .../qpid/server/filter/PropertyExpressionTest.java | 56 ++++++++++++++++++ .../protocol/InternalTestProtocolSession.java | 18 +++--- .../apache/qpid/server/queue/MockQueueEntry.java | 21 ++++++- 20 files changed, 218 insertions(+), 127 deletions(-) create mode 100644 qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java (limited to 'qpid/java') diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 26ac562fb2..5fde08cbdd 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -491,7 +491,7 @@ public class AMQChannel if (!unacked.isQueueDeleted()) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery unacked.release(); @@ -522,7 +522,7 @@ public class AMQChannel if (unacked != null) { // Mark message redelivered - unacked.getMessage().setRedelivered(true); + unacked.setRedelivered(true); // Ensure message is released for redelivery if (!unacked.isQueueDeleted()) @@ -611,13 +611,10 @@ public class AMQChannel for (Map.Entry entry : msgToResend.entrySet()) { - QueueEntry message = entry.getValue(); + QueueEntry queueEntry = entry.getValue(); long deliveryTag = entry.getKey(); - - - AMQMessage msg = message.getMessage(); - AMQQueue queue = message.getQueue(); + AMQQueue queue = queueEntry.getQueue(); // Our Java Client will always suspend the channel when resending! // If the client has requested the messages be resent then it is @@ -635,16 +632,16 @@ public class AMQChannel // Without any details from the client about what has been processed we have to mark // all messages in the unacked map as redelivered. - msg.setRedelivered(true); + queueEntry.setRedelivered(true); - Subscription sub = message.getDeliveredSubscription(); + Subscription sub = queueEntry.getDeliveredSubscription(); if (sub != null) { - if(!queue.resend(message, sub)) + if(!queue.resend(queueEntry, sub)) { - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -652,11 +649,11 @@ public class AMQChannel if (_log.isInfoEnabled()) { - _log.info("DeliveredSubscription not recorded so just requeueing(" + message.toString() + _log.info("DeliveredSubscription not recorded so just requeueing(" + queueEntry.toString() + ")to prevent loss"); } // move this message to requeue - msgToRequeue.put(deliveryTag, message); + msgToRequeue.put(deliveryTag, queueEntry); } } // for all messages // } else !isSuspend @@ -888,7 +885,7 @@ public class AMQChannel public void deliverToClient(final Subscription sub, final QueueEntry entry, final long deliveryTag) throws AMQException { - getProtocolSession().getProtocolOutputConverter().writeDeliver(entry.getMessage(), getChannelId(), deliveryTag, sub.getConsumerTag()); + getProtocolSession().getProtocolOutputConverter().writeDeliver(entry, getChannelId(), deliveryTag, sub.getConsumerTag()); } }; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java index 29494c4118..097ac27399 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/ExtractResendAndRequeue.java @@ -54,22 +54,21 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor _storeContext = storeContext; } - public boolean callback(final long deliveryTag, QueueEntry message) throws AMQException + public boolean callback(final long deliveryTag, QueueEntry queueEntry) throws AMQException { - - AMQMessage msg = message.getMessage(); - msg.setRedelivered(true); - final Subscription subscription = message.getDeliveredSubscription(); + + queueEntry.setRedelivered(true); + final Subscription subscription = queueEntry.getDeliveredSubscription(); if (subscription != null) { // Consumer exists if (!subscription.isClosed()) { - _msgToResend.put(deliveryTag, message); + _msgToResend.put(deliveryTag, queueEntry); } else // consumer has gone { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } } else @@ -77,22 +76,22 @@ public class ExtractResendAndRequeue implements UnacknowledgedMessageMap.Visitor // Message has no consumer tag, so was "delivered" to a GET // or consumer no longer registered // cannot resend, so re-queue. - if (!message.isQueueDeleted()) + if (!queueEntry.isQueueDeleted()) { if (_requeueIfUnabletoResend) { - _msgToRequeue.put(deliveryTag, message); + _msgToRequeue.put(deliveryTag, queueEntry); } else { - message.discard(_storeContext); - _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.info("No DeadLetter Queue and requeue not requested so dropping message:" + queueEntry); } } else { - message.discard(_storeContext); - _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + message); + queueEntry.discard(_storeContext); + _log.warn("Message.queue is null and no DeadLetter Queue so dropping message:" + queueEntry); } } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java index b30c70dac3..fa276169bf 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/filter/PropertyExpression.java @@ -71,13 +71,7 @@ public class PropertyExpression implements Expression JMS_PROPERTY_EXPRESSIONS.put("JMSExpiration", new ExpirationExpression()); - JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new Expression() - { - public Object evaluate(Filterable message) throws E - { - return message.isRedelivered(); - } - }); + JMS_PROPERTY_EXPRESSIONS.put("JMSRedelivered", new RedeliveredExpression()); } private final String name; @@ -265,4 +259,13 @@ public class PropertyExpression implements Expression } } + + private static class RedeliveredExpression implements Expression + { + public Object evaluate(Filterable message) throws E + { + return message.isRedelivered(); + } + } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java index 001b7858ec..0f492a21bb 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicGetMethodHandler.java @@ -130,7 +130,7 @@ public class BasicGetMethodHandler implements StateAwareMethodListener +public class AMQMessage { /** Used for debugging purposes. */ private static final Logger _log = Logger.getLogger(AMQMessage.class); @@ -396,16 +396,6 @@ public class AMQMessage implements Filterable return _messageHandle.getMessagePublishInfo(getStoreContext()); } - public boolean isRedelivered() - { - return _messageHandle.isRedelivered(); - } - - public void setRedelivered(boolean redelivered) - { - _messageHandle.setRedelivered(redelivered); - } - public long getArrivalTime() { return _messageHandle.getArrivalTime(); diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java index 0ddd4e4d92..93ac21fc7c 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessageHandle.java @@ -63,10 +63,6 @@ public interface AMQMessageHandle MessagePublishInfo getMessagePublishInfo(StoreContext context) throws AMQException; - boolean isRedelivered(); - - void setRedelivered(boolean redelivered); - boolean isPersistent(); void setPublishAndContentHeaderBody(StoreContext storeContext, MessagePublishInfo messagePublishInfo, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java index 2ed6be77c6..6f478dffd7 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java @@ -397,11 +397,13 @@ public class AMQQueueMBean extends AMQManagedObject implements ManagedQueue, Que // Create the tabular list of message header contents for (int i = beginIndex; (i <= endIndex) && (i <= list.size()); i++) { - AMQMessage msg = list.get(i - 1).getMessage(); + QueueEntry queueEntry = list.get(i - 1); + AMQMessage msg = queueEntry.getMessage(); ContentHeaderBody headerBody = msg.getContentHeaderBody(); // Create header attributes list String[] headerAttributes = getMessageHeaderProperties(headerBody); - Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, msg.isRedelivered() }; + Object[] itemValues = { msg.getMessageId(), headerAttributes, headerBody.bodySize, + queueEntry.isRedelivered() }; CompositeData messageData = new CompositeDataSupport(_messageDataType, _msgAttributeNames, itemValues); _messageList.put(messageData); } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java index 35ad5be4e0..2a7c90a81e 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/InMemoryMessageHandle.java @@ -109,17 +109,6 @@ public class InMemoryMessageHandle implements AMQMessageHandle return _messagePublishInfo; } - public boolean isRedelivered() - { - return _redelivered; - } - - - public void setRedelivered(boolean redelivered) - { - _redelivered = redelivered; - } - public boolean isPersistent() { return false; diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java index 0b214ca336..7573a629c1 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageHandleFactory.java @@ -42,5 +42,8 @@ public class MessageHandleFactory { return new InMemoryMessageHandle(messageId); } + +// return new AMQMessage(messageId, store, persistent); } + } diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java index 2657c459a9..0df976a620 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntry.java @@ -24,11 +24,8 @@ import org.apache.qpid.server.subscription.Subscription; * under the License. * */ -public interface QueueEntry extends Comparable +public interface QueueEntry extends Comparable, Filterable { - - - public static enum State { AVAILABLE, diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java index dbad5438dc..fe9686e906 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/QueueEntryImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; import org.apache.log4j.Logger; @@ -44,6 +45,7 @@ public class QueueEntryImpl implements QueueEntry private AMQMessage _message; + private boolean _redelivered; private Set _rejectedBy = null; @@ -186,9 +188,26 @@ public class QueueEntryImpl implements QueueEntry return _message.immediateAndNotDelivered(); } - public void setRedelivered(boolean b) + public ContentHeaderBody getContentHeaderBody() throws AMQException { - getMessage().setRedelivered(b); + return _message.getContentHeaderBody(); + } + + public boolean isPersistent() throws AMQException + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _redelivered; + } + + public void setRedelivered(boolean redelivered) + { + _redelivered = redelivered; + // todo - here we could mark this message as redelivered so we don't have to mark + // all messages on recover as redelivered. } public Subscription getDeliveredSubscription() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java index a616c2ea35..be11eb7b84 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/subscription/SubscriptionImpl.java @@ -434,7 +434,7 @@ public abstract class SubscriptionImpl implements Subscription, FlowCreditManage private boolean checkFilters(QueueEntry msg) { - return (_filters == null) || _filters.allAllow(msg.getMessage()); + return (_filters == null) || _filters.allAllow(msg); } public boolean isAutoClose() diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java index 2fa017fc64..b5a91c8da6 100644 --- a/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java +++ b/qpid/java/broker/src/main/java/org/apache/qpid/tools/messagestore/commands/Show.java @@ -358,7 +358,7 @@ public class Show extends AbstractCommand ispersitent.add("n/a"); } - isredelivered.add(msg.isRedelivered() ? "true" : "false"); + isredelivered.add(entry.isRedelivered() ? "true" : "false"); isdelivered.add(msg.getDeliveredToConsumer() ? "true" : "false"); diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java index 6dcb187a37..12fa4ef952 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java @@ -405,6 +405,21 @@ public class AbstractHeadersExchangeTestBase extends TestCase { return 0; //To change body of implemented methods use File | Settings | File Templates. } + + public ContentHeaderBody getContentHeaderBody() throws AMQException + { + return null; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isPersistent() throws AMQException + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } + + public boolean isRedelivered() + { + return false; //To change body of implemented methods use File | Settings | File Templates. + } }; } diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java new file mode 100644 index 0000000000..9344efd4a8 --- /dev/null +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/filter/PropertyExpressionTest.java @@ -0,0 +1,56 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.filter; + +import junit.framework.TestCase; +import org.apache.qpid.AMQException; +import org.apache.qpid.server.queue.MockQueueEntry; + +public class PropertyExpressionTest extends TestCase +{ + + public void testJMSRedelivered() + { + PropertyExpression pe = new PropertyExpression("JMSRedelivered"); + + MockQueueEntry queueEntry = new MockQueueEntry(); + + try + { + assertEquals("MockQueueEntry.redelivered should initialy be false", Boolean.FALSE, pe.evaluate(queueEntry)); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + + queueEntry.setRedelivered(true); + + try + { + assertEquals("MockQueueEntry.redelivered not updated", Boolean.TRUE, pe.evaluate(queueEntry)); + } + catch (AMQException e) + { + fail(e.getMessage()); + } + } +} diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java index da35ddc594..08f6fae230 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/protocol/InternalTestProtocolSession.java @@ -25,8 +25,8 @@ import org.apache.qpid.codec.AMQCodecFactory; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.output.ProtocolOutputConverter; import org.apache.qpid.server.queue.AMQMessage; +import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.registry.ApplicationRegistry; -import org.apache.qpid.server.AMQChannel; import java.util.ArrayList; import java.util.HashMap; @@ -99,7 +99,7 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen { } - public void writeDeliver(AMQMessage message, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException + public void writeDeliver(QueueEntry queueEntry, int channelId, long deliveryTag, AMQShortString consumerTag) throws AMQException { _deliveryCount.incrementAndGet(); @@ -121,11 +121,11 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen consumers.put(consumerTag, consumerDelivers); } - consumerDelivers.add(new DeliveryPair(deliveryTag, message)); + consumerDelivers.add(new DeliveryPair(deliveryTag, queueEntry)); } } - public void writeGetOk(AMQMessage message, int channelId, long deliveryTag, int queueSize) throws AMQException + public void writeGetOk(QueueEntry queueEntry, int channelId, long deliveryTag, int queueSize) throws AMQException { } @@ -147,17 +147,17 @@ public class InternalTestProtocolSession extends AMQMinaProtocolSession implemen public class DeliveryPair { private long _deliveryTag; - private AMQMessage _message; + private QueueEntry _queueEntry; - public DeliveryPair(long deliveryTag, AMQMessage message) + public DeliveryPair(long deliveryTag, QueueEntry queueEntry) { _deliveryTag = deliveryTag; - _message = message; + _queueEntry = queueEntry; } - public AMQMessage getMessage() + public QueueEntry getMessage() { - return _message; + return _queueEntry; } public long getDeliveryTag() diff --git a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java index 37f91e7464..ed7b2923e7 100644 --- a/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java +++ b/qpid/java/broker/src/test/java/org/apache/qpid/server/queue/MockQueueEntry.java @@ -21,6 +21,7 @@ package org.apache.qpid.server.queue; import org.apache.qpid.AMQException; +import org.apache.qpid.framing.ContentHeaderBody; import org.apache.qpid.server.store.StoreContext; import org.apache.qpid.server.subscription.Subscription; @@ -28,6 +29,7 @@ public class MockQueueEntry implements QueueEntry { private AMQMessage _message; + private boolean _redelivered; public boolean acquire() { @@ -176,10 +178,9 @@ public class MockQueueEntry implements QueueEntry } - public void setRedelivered(boolean b) + public void setRedelivered(boolean redelivered) { - - + _redelivered = redelivered; } @@ -194,4 +195,18 @@ public class MockQueueEntry implements QueueEntry _message = msg; } + public ContentHeaderBody getContentHeaderBody() throws AMQException + { + return _message.getContentHeaderBody(); + } + + public boolean isPersistent() throws AMQException + { + return _message.isPersistent(); + } + + public boolean isRedelivered() + { + return _redelivered; + } } -- cgit v1.2.1