diff options
Diffstat (limited to 'java/cluster/src')
| -rw-r--r-- | java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java | 14 | ||||
| -rw-r--r-- | java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java | 9 |
2 files changed, 15 insertions, 8 deletions
diff --git a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java index 51c57efdae..7e5563460f 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java @@ -22,13 +22,19 @@ package org.apache.qpid.server.cluster; import org.apache.qpid.AMQException; import org.apache.qpid.framing.AMQFrame; -import org.apache.qpid.framing.ContentBody; +import org.apache.qpid.framing.MethodConverter_8_0; +import org.apache.qpid.framing.abstraction.ContentChunk; +import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter; import org.apache.qpid.server.queue.AMQMessage; import java.util.Iterator; public class SimpleSendable implements Sendable { + + //todo fixme - remove 0-8 hard coding + ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0(); + private final AMQMessage _message; public SimpleSendable(AMQMessage message) @@ -38,12 +44,12 @@ public class SimpleSendable implements Sendable public void send(int channel, Member member) throws AMQException { - member.send(new AMQFrame(channel, _message.getPublishBody())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo()))); member.send(new AMQFrame(channel, _message.getContentHeaderBody())); - Iterator<ContentBody> it = _message.getContentBodyIterator(); + Iterator<ContentChunk> it = _message.getContentBodyIterator(); while (it.hasNext()) { - member.send(new AMQFrame(channel, it.next())); + member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next()))); } } } diff --git a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java index d0a64c7d6f..2a83d65ae5 100644 --- a/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java +++ b/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java @@ -31,8 +31,6 @@ import org.apache.qpid.server.cluster.SimpleSendable; import org.apache.qpid.server.cluster.util.LogMessage; import org.apache.qpid.server.virtualhost.VirtualHost; -import java.util.concurrent.Executor; - /** * TODO: separate out an abstract base class from AMQQueue from which this inherits. It does * not require all the functionality currently in AMQQueue. @@ -81,8 +79,11 @@ public class RemoteQueueProxy extends AMQQueue void relay(AMQMessage msg) throws AMQException { - BasicPublishBody publish = msg.getPublishBody(); - publish.immediate = false; //can't as yet handle the immediate flag in a cluster + // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object + // if cluster can handle immediate then it should wrap the wrapper... + +// BasicPublishBody publish = msg.getMessagePublishInfo(); +// publish.immediate = false; //can't as yet handle the immediate flag in a cluster // send this on to the broker for which it is acting as proxy: _groupMgr.send(_target, new SimpleSendable(msg)); |
