diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 15:23:55 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-02-15 15:23:55 +0000 |
| commit | 9ffd924daedfc7d1d3c2e072befaf6645aef671e (patch) | |
| tree | 7321379b681f2025cffe9e8e5c0497c993125b89 /java/broker/src | |
| parent | a22f3f594d6eee7d610fb4f140e18cddd7c880f6 (diff) | |
| download | qpid-python-9ffd924daedfc7d1d3c2e072befaf6645aef671e.tar.gz | |
Fixes to get TransactedTest back, there are still unresolved issues with rollback(), however.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507960 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
| -rw-r--r-- | java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java | 85 |
1 files changed, 41 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 245e762fa4..00243f865b 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -639,50 +639,47 @@ public class AMQChannel /** * Called to resend all outstanding unacknowledged messages to this same channel. */ - public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException - { - throw new Error("XXX"); -// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); -// -// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() -// { -// public boolean callback(UnacknowledgedMessage message) throws AMQException -// { -// long deliveryTag = message.deliveryTag; -// AMQShortString consumerTag = message.consumerTag; -// AMQMessage msg = message.message; -// msg.setRedelivered(true); -// // working -// // deliver(msg, consumerTag, deliveryTag); -// // trunk -// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) -// { -// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); -// } -// else -// { -// // Message has no consumer tag, so was "delivered" to a GET -// // or consumer no longer registered -// // cannot resend, so re-queue. -// if (message.queue != null && (consumerTag == null || requeue)) -// { -// msgToRequeue.add(message); -// } -// } -// // false means continue processing -// return false; -// } -// -// public void visitComplete() -// { -// } -// }); -// -// for(UnacknowledgedMessage message : msgToRequeue) -// { -// _txnContext.deliver(message.message, message.queue); -// _unacknowledgedMessageMap.remove(message.deliveryTag); -// } + public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException + { + final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>(); + + _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor() + { + public boolean callback(UnacknowledgedMessage message) throws AMQException + { + long deliveryTag = message.deliveryTag; + AMQShortString consumerTag = message.consumerTag; + AMQMessage msg = message.message; + msg.setRedelivered(true); + if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag)) + { + deliver(msg, consumerTag, deliveryTag); + //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag); + } + else + { + // Message has no consumer tag, so was "delivered" to a GET + // or consumer no longer registered + // cannot resend, so re-queue. + if (message.queue != null && (consumerTag == null || requeue)) + { + msgToRequeue.add(message); + } + } + // false means continue processing + return false; + } + + public void visitComplete() + { + } + }); + + for(UnacknowledgedMessage message : msgToRequeue) + { + _txnContext.deliver(message.message, message.queue); + _unacknowledgedMessageMap.remove(message.deliveryTag); + } } /** |
