summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java37
1 files changed, 28 insertions, 9 deletions
diff --git a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index dcbd887896..cb19532872 100644
--- a/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/qpid/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -490,26 +490,45 @@ public class AMQChannel
}
}
+ // Place to hold any error that occured during the requeueing.
+ AMQException error = null;
for (QueueEntry unacked : messagesToBeDelivered)
{
- if (!unacked.isQueueDeleted())
+ try
{
- // Mark message redelivered
- unacked.setRedelivered(true);
+ if (!unacked.isQueueDeleted())
+ {
+ // Mark message redelivered
+ unacked.setRedelivered(true);
- // Ensure message is released for redelivery
- unacked.release();
+ // Ensure message is released for redelivery
+ unacked.release();
- // Deliver Message
- deliveryContext.requeue(unacked);
+ // Deliver Message
+ deliveryContext.requeue(unacked);
+ }
+ else
+ {
+ unacked.dequeueAndDelete(_storeContext);
+ }
}
- else
+ catch (AMQException e)
{
- unacked.dequeueAndDelete(_storeContext);
+ //Log the error and store it
+ _log.error(e.getMessage(),e);
+ // We store the last seen exception for rethrowing after
+ // attempting to process all the entries.
+ error = e;
}
}
+ // If we had an error during the requeue process throw it now.
+ if (error != null)
+ {
+ throw error;
+ }
+
}
/**