From e2e91cd29a4898a03b7a34bdc1105ea70cdafd09 Mon Sep 17 00:00:00 2001 From: Kim van der Riet Date: Thu, 15 Feb 2007 19:30:50 +0000 Subject: Adjusted mechanism for sending refs. (still in progress) git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508098 13f79535-47bb-0310-9956-ffa450edef68 --- .../java/org/apache/qpid/server/AMQChannel.java | 60 ++++++++++++---------- 1 file changed, 33 insertions(+), 27 deletions(-) (limited to 'java') diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java index 752452ab38..9214263fe3 100644 --- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java +++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java @@ -409,37 +409,43 @@ public class AMQChannel public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag) { final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes(); - AMQMethodBody openBody = MessageOpenBody.createMethodBody( - _session.getProtocolMajorVersion(), // AMQP major version - _session.getProtocolMinorVersion(), // AMQP minor version - refId); - _session.writeRequest(_channelId, openBody, new AMQMethodListener() - { - public boolean methodReceived(AMQMethodEvent evt) throws AMQException - { - AMQMethodBody method = evt.getMethod(); - if (_log.isDebugEnabled()) - { - _log.debug(method + " received on channel " + _channelId); - } - if (method instanceof MessageOkBody) - { - acknowledgeMessage(deliveryTag, false); - deliverRef(refId, msg, destination, _session.getStateManager()); - return true; - } - else - { - // TODO: implement reject - return false; - } - } - public void error(Exception e) {} - }); + deliverRef(refId, msg, destination, _session.getStateManager()); +// AMQMethodBody openBody = MessageOpenBody.createMethodBody( +// _session.getProtocolMajorVersion(), // AMQP major version +// _session.getProtocolMinorVersion(), // AMQP minor version +// refId); +// _session.writeRequest(_channelId, openBody, new AMQMethodListener() +// { +// public boolean methodReceived(AMQMethodEvent evt) throws AMQException +// { +// AMQMethodBody method = evt.getMethod(); +// if (_log.isDebugEnabled()) +// { +// _log.debug(method + " received on channel " + _channelId); +// } +// if (method instanceof MessageOkBody) +// { +// acknowledgeMessage(deliveryTag, false); +// deliverRef(refId, msg, destination, _session.getStateManager()); +// return true; +// } +// else +// { +// // TODO: implement reject +// return false; +// } +// } +// public void error(Exception e) {} +// }); } public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener) { + AMQMethodBody openBody = MessageOpenBody.createMethodBody( + _session.getProtocolMajorVersion(), // AMQP major version + _session.getProtocolMinorVersion(), // AMQP minor version + refId); + _session.writeRequest(_channelId, openBody, listener); MessageTransferBody mtb = msg.getTransferBody().copy(); mtb.destination = destination; mtb.redelivered = msg.isRedelivered(); -- cgit v1.2.1