diff options
| author | Keith Wall <kwall@apache.org> | 2011-11-28 09:26:58 +0000 |
|---|---|---|
| committer | Keith Wall <kwall@apache.org> | 2011-11-28 09:26:58 +0000 |
| commit | 24fb3dba2d14bb149c2829d7154b7cce782903df (patch) | |
| tree | 92259fd5904017cba1e8f047c33752f2718df2ca /java/broker | |
| parent | 4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (diff) | |
| download | qpid-python-24fb3dba2d14bb149c2829d7154b7cce782903df.tar.gz | |
QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly.
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com>
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
5 files changed, 77 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java index b49b12fb79..80c5e2866c 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java @@ -53,12 +53,12 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi } - public void onRelease() + public void onRelease(boolean setRedelivered) { final Subscription_0_10 subscription = getSubscription(); if(subscription != null && _entry.isAcquiredBy(_sub)) { - subscription.release(_entry); + subscription.release(_entry, setRedelivered); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java index b5bb2014b5..a61b0b4e82 100755 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java @@ -43,11 +43,11 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi _logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)"); } - public void onRelease() + public void onRelease(boolean setRedelivered) { if(_entry.isAcquiredBy(_sub)) { - getSubscription().release(_entry); + getSubscription().release(_entry, setRedelivered); } else { diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java index d302c9ad15..273bab0ebe 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java +++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java @@ -24,12 +24,15 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT; import org.apache.qpid.server.queue.AMQQueue; +import org.apache.qpid.server.queue.BaseQueue; +import org.apache.qpid.server.queue.InboundMessageAdapter; import org.apache.qpid.server.queue.QueueEntry; import org.apache.qpid.server.configuration.ConfigStore; import org.apache.qpid.server.configuration.ConfiguredObject; import org.apache.qpid.server.configuration.SessionConfig; import org.apache.qpid.server.configuration.SubscriptionConfig; import org.apache.qpid.server.configuration.SubscriptionConfigType; +import org.apache.qpid.server.exchange.Exchange; import org.apache.qpid.server.flow.FlowCreditManager; import org.apache.qpid.server.flow.CreditCreditManager; import org.apache.qpid.server.flow.WindowCreditManager; @@ -37,9 +40,11 @@ import org.apache.qpid.server.flow.FlowCreditManager_0_10; import org.apache.qpid.server.filter.FilterManager; import org.apache.qpid.server.logging.actors.CurrentActor; import org.apache.qpid.server.logging.actors.GenericActor; +import org.apache.qpid.server.logging.messages.ChannelMessages; import org.apache.qpid.server.logging.messages.SubscriptionMessages; import org.apache.qpid.server.logging.LogActor; import org.apache.qpid.server.logging.LogSubject; +import org.apache.qpid.server.message.InboundMessage; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.message.MessageTransferMessage; import org.apache.qpid.server.message.AMQMessage; @@ -80,6 +85,7 @@ import java.nio.ByteBuffer; public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject { + private final long _subscriptionID; private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this); @@ -601,6 +607,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } _session.sendMessage(xfr, _postIdSettingAction); + entry.incrementDeliveryCount(); _deliveredCount.incrementAndGet(); if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED) { @@ -643,10 +650,68 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr } - void release(QueueEntry entry) + void release(QueueEntry entry, boolean setRedelivered) { - entry.setRedelivered(); - entry.release(); + boolean maxDeliveryLimitExceeded = false; + if (setRedelivered) + { + entry.setRedelivered(); + maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry); + } + else + { + entry.decrementDeliveryCount(); + } + + if (maxDeliveryLimitExceeded) + { + sendToDLQOrDiscard(entry); + } + else + { + entry.release(); + } + } + + protected void sendToDLQOrDiscard(QueueEntry entry) + { + final Exchange alternateExchange = entry.getQueue().getAlternateExchange(); + final LogActor logActor = CurrentActor.get(); + final ServerMessage msg = entry.getMessage(); + if (alternateExchange != null) + { + final InboundMessage m = new InboundMessageAdapter(entry); + + final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m); + + if (destinationQueues == null || destinationQueues.isEmpty()) + { + entry.discard(); + + logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName())); + } + else + { + entry.routeToAlternate(); + + //output operational logging for each delivery post commit + for (final BaseQueue destinationQueue : destinationQueues) + { + logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString())); + } + } + } + else + { + entry.discard(); + logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey())); + } + } + + private boolean isMaxDeliveryLimitExceeded(QueueEntry entry) + { + final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount(); + return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit); } public void queueDeleted(AMQQueue queue) diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java index 7031502e34..ac95750e66 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java @@ -93,7 +93,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi { public void onAccept(); - public void onRelease(); + public void onRelease(boolean setRedelivered); public void onReject(); @@ -230,13 +230,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi } - public void release(RangeSet ranges) + public void release(RangeSet ranges, final boolean setRedelivered) { dispositionChange(ranges, new MessageDispositionAction() { public void performAction(MessageDispositionChangeListener listener) { - listener.onRelease(); + listener.onRelease(setRedelivered); } }); } @@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi _transaction.rollback(); for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values()) { - listener.onRelease(); + listener.onRelease(false); } _messageDispositionListenerMap.clear(); diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java index 754a233907..a0dca53ed0 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java +++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java @@ -148,7 +148,7 @@ public class ServerSessionDelegate extends SessionDelegate @Override public void messageRelease(Session session, MessageRelease method) { - ((ServerSession)session).release(method.getTransfers()); + ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered()); } @Override |
