summaryrefslogtreecommitdiff
path: root/java/broker
diff options
context:
space:
mode:
authorKeith Wall <kwall@apache.org>2011-11-28 09:26:58 +0000
committerKeith Wall <kwall@apache.org>2011-11-28 09:26:58 +0000
commit24fb3dba2d14bb149c2829d7154b7cce782903df (patch)
tree92259fd5904017cba1e8f047c33752f2718df2ca /java/broker
parent4ee4c8776c48bdc0a2bd1c2e34e71bf3a63e33cd (diff)
downloadqpid-python-24fb3dba2d14bb149c2829d7154b7cce782903df.tar.gz
QPID-3642, QPID-3640: Add Dead Letter Queue functionality for 0-10 path. Also, it fixes issue with setting of redelivered flag for pre-fetched messages as DLQ functionality relies on this flag being set correctly.
Applied patch from Andrew MacBean <andymacbean@gmail.com> and Oleksandr Rudyy<orudyy@gmail.com> git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@1207031 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker')
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java4
-rwxr-xr-xjava/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java4
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java71
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java8
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java2
5 files changed, 77 insertions, 12 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
index b49b12fb79..80c5e2866c 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ExplicitAcceptDispositionChangeListener.java
@@ -53,12 +53,12 @@ class ExplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
}
- public void onRelease()
+ public void onRelease(boolean setRedelivered)
{
final Subscription_0_10 subscription = getSubscription();
if(subscription != null && _entry.isAcquiredBy(_sub))
{
- subscription.release(_entry);
+ subscription.release(_entry, setRedelivered);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
index b5bb2014b5..a61b0b4e82 100755
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/ImplicitAcceptDispositionChangeListener.java
@@ -43,11 +43,11 @@ class ImplicitAcceptDispositionChangeListener implements ServerSession.MessageDi
_logger.warn("MessageAccept received for message which is using NONE as the accept mode (likely client error)");
}
- public void onRelease()
+ public void onRelease(boolean setRedelivered)
{
if(_entry.isAcquiredBy(_sub))
{
- getSubscription().release(_entry);
+ getSubscription().release(_entry, setRedelivered);
}
else
{
diff --git a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
index d302c9ad15..273bab0ebe 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/subscription/Subscription_0_10.java
@@ -24,12 +24,15 @@ import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.SUBSCRIPT
import static org.apache.qpid.server.logging.subjects.LogSubjectFormat.QUEUE_FORMAT;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.BaseQueue;
+import org.apache.qpid.server.queue.InboundMessageAdapter;
import org.apache.qpid.server.queue.QueueEntry;
import org.apache.qpid.server.configuration.ConfigStore;
import org.apache.qpid.server.configuration.ConfiguredObject;
import org.apache.qpid.server.configuration.SessionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfig;
import org.apache.qpid.server.configuration.SubscriptionConfigType;
+import org.apache.qpid.server.exchange.Exchange;
import org.apache.qpid.server.flow.FlowCreditManager;
import org.apache.qpid.server.flow.CreditCreditManager;
import org.apache.qpid.server.flow.WindowCreditManager;
@@ -37,9 +40,11 @@ import org.apache.qpid.server.flow.FlowCreditManager_0_10;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.logging.actors.CurrentActor;
import org.apache.qpid.server.logging.actors.GenericActor;
+import org.apache.qpid.server.logging.messages.ChannelMessages;
import org.apache.qpid.server.logging.messages.SubscriptionMessages;
import org.apache.qpid.server.logging.LogActor;
import org.apache.qpid.server.logging.LogSubject;
+import org.apache.qpid.server.message.InboundMessage;
import org.apache.qpid.server.message.ServerMessage;
import org.apache.qpid.server.message.MessageTransferMessage;
import org.apache.qpid.server.message.AMQMessage;
@@ -80,6 +85,7 @@ import java.nio.ByteBuffer;
public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCreditManagerListener, SubscriptionConfig, LogSubject
{
+
private final long _subscriptionID;
private final QueueEntry.SubscriptionAcquiredState _owningState = new QueueEntry.SubscriptionAcquiredState(this);
@@ -601,6 +607,7 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
_session.sendMessage(xfr, _postIdSettingAction);
+ entry.incrementDeliveryCount();
_deliveredCount.incrementAndGet();
if(_acceptMode == MessageAcceptMode.NONE && _acquireMode == MessageAcquireMode.PRE_ACQUIRED)
{
@@ -643,10 +650,68 @@ public class Subscription_0_10 implements Subscription, FlowCreditManager.FlowCr
}
- void release(QueueEntry entry)
+ void release(QueueEntry entry, boolean setRedelivered)
{
- entry.setRedelivered();
- entry.release();
+ boolean maxDeliveryLimitExceeded = false;
+ if (setRedelivered)
+ {
+ entry.setRedelivered();
+ maxDeliveryLimitExceeded = isMaxDeliveryLimitExceeded(entry);
+ }
+ else
+ {
+ entry.decrementDeliveryCount();
+ }
+
+ if (maxDeliveryLimitExceeded)
+ {
+ sendToDLQOrDiscard(entry);
+ }
+ else
+ {
+ entry.release();
+ }
+ }
+
+ protected void sendToDLQOrDiscard(QueueEntry entry)
+ {
+ final Exchange alternateExchange = entry.getQueue().getAlternateExchange();
+ final LogActor logActor = CurrentActor.get();
+ final ServerMessage msg = entry.getMessage();
+ if (alternateExchange != null)
+ {
+ final InboundMessage m = new InboundMessageAdapter(entry);
+
+ final ArrayList<? extends BaseQueue> destinationQueues = alternateExchange.route(m);
+
+ if (destinationQueues == null || destinationQueues.isEmpty())
+ {
+ entry.discard();
+
+ logActor.message( ChannelMessages.DISCARDMSG_NOROUTE(msg.getMessageNumber(), alternateExchange.getName()));
+ }
+ else
+ {
+ entry.routeToAlternate();
+
+ //output operational logging for each delivery post commit
+ for (final BaseQueue destinationQueue : destinationQueues)
+ {
+ logActor.message( ChannelMessages.DEADLETTERMSG(msg.getMessageNumber(), destinationQueue.getNameShortString().asString()));
+ }
+ }
+ }
+ else
+ {
+ entry.discard();
+ logActor.message(ChannelMessages.DISCARDMSG_NOALTEXCH(msg.getMessageNumber(), entry.getQueue().getName(), msg.getRoutingKey()));
+ }
+ }
+
+ private boolean isMaxDeliveryLimitExceeded(QueueEntry entry)
+ {
+ final int maxDeliveryLimit = entry.getQueue().getMaximumDeliveryCount();
+ return (maxDeliveryLimit > 0 && entry.getDeliveryCount() >= maxDeliveryLimit);
}
public void queueDeleted(AMQQueue queue)
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
index 7031502e34..ac95750e66 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSession.java
@@ -93,7 +93,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
{
public void onAccept();
- public void onRelease();
+ public void onRelease(boolean setRedelivered);
public void onReject();
@@ -230,13 +230,13 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
}
- public void release(RangeSet ranges)
+ public void release(RangeSet ranges, final boolean setRedelivered)
{
dispositionChange(ranges, new MessageDispositionAction()
{
public void performAction(MessageDispositionChangeListener listener)
{
- listener.onRelease();
+ listener.onRelease(setRedelivered);
}
});
}
@@ -350,7 +350,7 @@ public class ServerSession extends Session implements AuthorizationHolder, Sessi
_transaction.rollback();
for(MessageDispositionChangeListener listener : _messageDispositionListenerMap.values())
{
- listener.onRelease();
+ listener.onRelease(false);
}
_messageDispositionListenerMap.clear();
diff --git a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
index 754a233907..a0dca53ed0 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/transport/ServerSessionDelegate.java
@@ -148,7 +148,7 @@ public class ServerSessionDelegate extends SessionDelegate
@Override
public void messageRelease(Session session, MessageRelease method)
{
- ((ServerSession)session).release(method.getTransfers());
+ ((ServerSession)session).release(method.getTransfers(), method.getSetRedelivered());
}
@Override