From 9ffd924daedfc7d1d3c2e072befaf6645aef671e Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 15 Feb 2007 15:23:55 +0000 Subject: Fixes to get TransactedTest back, there are still unresolved issues with rollback(), however. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507960 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 85 +++++++++++----------- 1 file changed, 41 insertions(+), 44 deletions(-) (limited to 'java/broker/src') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 245e762fa4..00243f865b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -639,50 +639,47 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException - { - throw new Error("XXX"); -// final List msgToRequeue = new LinkedList(); -// -// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() -// { -// public boolean callback(UnacknowledgedMessage message) throws AMQException -// { -// long deliveryTag = message.deliveryTag; -// AMQShortString consumerTag = message.consumerTag; -// AMQMessage msg = message.message; -// msg.setRedelivered(true); -// // working -// // deliver(msg, consumerTag, deliveryTag); -// // trunk -// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) -// { -// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); -// } -// else -// { -// // Message has no consumer tag, so was "delivered" to a GET -// // or consumer no longer registered -// // cannot resend, so re-queue. -// if (message.queue != null && (consumerTag == null || requeue)) -// { -// msgToRequeue.add(message); -// } -// } -// // false means continue processing -// return false; -// } -// -// public void visitComplete() -// { -// } -// }); -// -// for(UnacknowledgedMessage message : msgToRequeue) -// { -// _txnContext.deliver(message.message, message.queue); -// _unacknowledgedMessageMap.remove(message.deliveryTag); -// } + public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + { + final List msgToRequeue = new LinkedList(); + + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + long deliveryTag = message.deliveryTag; + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + { + deliver(msg, consumerTag, deliveryTag); + //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null && (consumerTag == null || requeue)) + { + msgToRequeue.add(message); + } + } + // false means continue processing + return false; + } + + public void visitComplete() + { + } + }); + + for(UnacknowledgedMessage message : msgToRequeue) + { + _txnContext.deliver(message.message, message.queue); + _unacknowledgedMessageMap.remove(message.deliveryTag); + } } /** -- cgit v1.2.1