summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-02-15 19:30:50 +0000
committerKim van der Riet <kpvdr@apache.org>2007-02-15 19:30:50 +0000
commite2e91cd29a4898a03b7a34bdc1105ea70cdafd09 (patch)
treea6f8ad537836e4f5fd6b98b3212147b85bd879c4 /java
parent366bacef0be607c82053b960255926e27a7fe0cf (diff)
downloadqpid-python-e2e91cd29a4898a03b7a34bdc1105ea70cdafd09.tar.gz
Adjusted mechanism for sending refs. (still in progress)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@508098 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java60
1 files changed, 33 insertions, 27 deletions
diff --git a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
index 752452ab38..9214263fe3 100644
--- a/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
+++ b/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
@@ -409,37 +409,43 @@ public class AMQChannel
public void deliverRef(final AMQMessage msg, final AMQShortString destination, final long deliveryTag)
{
final byte[] refId = String.valueOf(System.currentTimeMillis()).getBytes();
- AMQMethodBody openBody = MessageOpenBody.createMethodBody(
- _session.getProtocolMajorVersion(), // AMQP major version
- _session.getProtocolMinorVersion(), // AMQP minor version
- refId);
- _session.writeRequest(_channelId, openBody, new AMQMethodListener()
- {
- public boolean methodReceived(AMQMethodEvent evt) throws AMQException
- {
- AMQMethodBody method = evt.getMethod();
- if (_log.isDebugEnabled())
- {
- _log.debug(method + " received on channel " + _channelId);
- }
- if (method instanceof MessageOkBody)
- {
- acknowledgeMessage(deliveryTag, false);
- deliverRef(refId, msg, destination, _session.getStateManager());
- return true;
- }
- else
- {
- // TODO: implement reject
- return false;
- }
- }
- public void error(Exception e) {}
- });
+ deliverRef(refId, msg, destination, _session.getStateManager());
+// AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+// _session.getProtocolMajorVersion(), // AMQP major version
+// _session.getProtocolMinorVersion(), // AMQP minor version
+// refId);
+// _session.writeRequest(_channelId, openBody, new AMQMethodListener()
+// {
+// public boolean methodReceived(AMQMethodEvent evt) throws AMQException
+// {
+// AMQMethodBody method = evt.getMethod();
+// if (_log.isDebugEnabled())
+// {
+// _log.debug(method + " received on channel " + _channelId);
+// }
+// if (method instanceof MessageOkBody)
+// {
+// acknowledgeMessage(deliveryTag, false);
+// deliverRef(refId, msg, destination, _session.getStateManager());
+// return true;
+// }
+// else
+// {
+// // TODO: implement reject
+// return false;
+// }
+// }
+// public void error(Exception e) {}
+// });
}
public void deliverRef(byte[] refId, AMQMessage msg, AMQShortString destination, AMQMethodListener listener)
{
+ AMQMethodBody openBody = MessageOpenBody.createMethodBody(
+ _session.getProtocolMajorVersion(), // AMQP major version
+ _session.getProtocolMinorVersion(), // AMQP minor version
+ refId);
+ _session.writeRequest(_channelId, openBody, listener);
MessageTransferBody mtb = msg.getTransferBody().copy();
mtb.destination = destination;
mtb.redelivered = msg.isRedelivered();