summaryrefslogtreecommitdiff
path: root/java/broker/src
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-15 15:23:55 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-15 15:23:55 +0000
commit9ffd924daedfc7d1d3c2e072befaf6645aef671e (patch)
tree7321379b681f2025cffe9e8e5c0497c993125b89 /java/broker/src
parenta22f3f594d6eee7d610fb4f140e18cddd7c880f6 (diff)
downloadqpid-python-9ffd924daedfc7d1d3c2e072befaf6645aef671e.tar.gz
Fixes to get TransactedTest back, there are still unresolved issues with rollback(), however.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@507960 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/broker/src')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java85
1 files changed, 41 insertions, 44 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 245e762fa4..00243f865b 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -639,50 +639,47 @@ public class AMQChannel
/**
* Called to resend all outstanding unacknowledged messages to this same channel.
*/
- public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
- {
- throw new Error("XXX");
-// final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
-//
-// _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
-// {
-// public boolean callback(UnacknowledgedMessage message) throws AMQException
-// {
-// long deliveryTag = message.deliveryTag;
-// AMQShortString consumerTag = message.consumerTag;
-// AMQMessage msg = message.message;
-// msg.setRedelivered(true);
-// // working
-// // deliver(msg, consumerTag, deliveryTag);
-// // trunk
-// if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
-// {
-// msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
-// }
-// else
-// {
-// // Message has no consumer tag, so was "delivered" to a GET
-// // or consumer no longer registered
-// // cannot resend, so re-queue.
-// if (message.queue != null && (consumerTag == null || requeue))
-// {
-// msgToRequeue.add(message);
-// }
-// }
-// // false means continue processing
-// return false;
-// }
-//
-// public void visitComplete()
-// {
-// }
-// });
-//
-// for(UnacknowledgedMessage message : msgToRequeue)
-// {
-// _txnContext.deliver(message.message, message.queue);
-// _unacknowledgedMessageMap.remove(message.deliveryTag);
-// }
+ public void resend(final AMQProtocolSession session, final boolean requeue) throws AMQException
+ {
+ final List<UnacknowledgedMessage> msgToRequeue = new LinkedList<UnacknowledgedMessage>();
+
+ _unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ long deliveryTag = message.deliveryTag;
+ AMQShortString consumerTag = message.consumerTag;
+ AMQMessage msg = message.message;
+ msg.setRedelivered(true);
+ if((consumerTag != null) && _consumerTag2QueueMap.containsKey(consumerTag))
+ {
+ deliver(msg, consumerTag, deliveryTag);
+ //msg.writeDeliver(session, _channelId, deliveryTag, consumerTag);
+ }
+ else
+ {
+ // Message has no consumer tag, so was "delivered" to a GET
+ // or consumer no longer registered
+ // cannot resend, so re-queue.
+ if (message.queue != null && (consumerTag == null || requeue))
+ {
+ msgToRequeue.add(message);
+ }
+ }
+ // false means continue processing
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ }
+ });
+
+ for(UnacknowledgedMessage message : msgToRequeue)
+ {
+ _txnContext.deliver(message.message, message.queue);
+ _unacknowledgedMessageMap.remove(message.deliveryTag);
+ }
}
/**