summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java85
1 files changed, 41 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 245e762fa4..00243f865b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -639,50 +639,47 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
- {
- throw new Error("XXX");
-// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
-//
-// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-// {
-// public boolean callback(UnacknowledgedMessage message) throws AMQException
-// {
-// long deliveryTag = message.deliveryTag;
-// AMQShortString consumerTag = message.consumerTag;
-// AMQMessage msg = message.message;
-// msg.setRedelivered(true);
-// // working
-// // deliver(msg, consumerTag, deliveryTag);
-// // trunk
-// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
-// {
-// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
-// }
-// else
-// {
-// // Message has no consumer tag, so was "delivered" to a GET
-// // or consumer no longer registered
-// // cannot resend, so re-queue.
-// if (message.queue != null && (consumerTag == null || requeue))
-// {
-// msgToRequeue.add(message);
-// }
-// }
-// // false means continue processing
-// return false;
-// }
-//
-// public void visitComplete()
-// {
-// }
-// });
-//
-// for(UnacknowledgedMessage message : msgToRequeue)
-// {
-// _txnContext.deliver(message.message, message.queue);
-// _unacknowledgedMessageMap.remove(message.deliveryTag);
-// }
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ {
+ final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ long deliveryTag = message.deliveryTag;
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ deliver(msg, consumerTag, deliveryTag);
+ //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null || requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**