summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorKim van der Riet <kpvdr@apache.org>2007-01-17 19:49:40 +0000
committerKim van der Riet <kpvdr@apache.org>2007-01-17 19:49:40 +0000
commit844e632c013affdfd028704a195a746f6973ebb2 (patch)
tree2a2fe63ad432e9dbeae557e5e30dc51654b9cdca /java/client
parente6998705ab84f21e92595b0186958516165476f8 (diff)
downloadqpid-python-844e632c013affdfd028704a195a746f6973ebb2.tar.gz
Conversion of client/BasicMessageProducer to new Message class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497138 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java101
1 files changed, 48 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index 1d2ca35016..391ce87ae4 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.message.AbstractJMSMessage;
import org.apache.qpid.client.message.JMSBytesMessage;
+import org.apache.qpid.client.message.MessageHeaders;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.framing.*;
@@ -100,7 +101,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
private final boolean _mandatory;
private final boolean _waitUntilSent;
- private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
+ private static final Content[] NO_CONTENT = new Content[0];
protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted,
int channelId, AMQSession session, AMQProtocolHandler protocolHandler,
@@ -515,24 +516,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @throws JMSException
*/
protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
- long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
+ long timeToLive, boolean mandatory, boolean immediate, boolean wait)
+ throws JMSException
{
checkTemporaryDestination(destination);
origMessage.setJMSDestination(destination);
-
AbstractJMSMessage message = convertToNativeMessage(origMessage);
message.getMessageHeaders().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL());
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- // Be aware of possible changes to parameter order as versions change.
- AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId,
- (byte)0, (byte)9, // AMQP version (major, minor)
- destination.getExchangeName(), // exchange
- immediate, // immediate
- mandatory, // mandatory
- destination.getRoutingKey(), // routingKey
- 0); // ticket
long currentTime = 0;
if (!_disableTimestamps)
@@ -542,53 +533,61 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
message.prepareForSending();
ByteBuffer payload = message.getData();
- BasicContentHeaderProperties contentHeaderProperties = message.getMessageHeaders();
+ MessageHeaders messageHeaders = message.getMessageHeaders();
if (timeToLive > 0)
{
if (!_disableTimestamps)
{
- contentHeaderProperties.setExpiration(currentTime + timeToLive);
+ messageHeaders.setExpiration(currentTime + timeToLive);
}
}
else
{
if (!_disableTimestamps)
{
- contentHeaderProperties.setExpiration(0);
+ messageHeaders.setExpiration(0);
}
}
- contentHeaderProperties.setDeliveryMode((byte) deliveryMode);
- contentHeaderProperties.setPriority((byte) priority);
+// messageHeaders.setDeliveryMode((byte) deliveryMode);
+// messageHeaders.setPriority((byte) priority);
int size = (payload != null) ? payload.limit() : 0;
- ContentBody[] contentBodies = createContentBodies(payload);
- AMQFrame[] frames = new AMQFrame[2 + contentBodies.length];
- for (int i = 0; i < contentBodies.length; i++)
- {
- frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]);
- }
- if (contentBodies.length > 0 && _logger.isDebugEnabled())
- {
- _logger.debug("Sending content body frames to " + destination);
- }
-
- // weight argument of zero indicates no child content headers, just bodies
- // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9)
- // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
- AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)0, (byte)9), 0,
- contentHeaderProperties,
- size);
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Sending content header frame to " + destination);
+ Content[] content = createContent(payload);
+ if (content.length > 0 && _logger.isDebugEnabled())
+ {
+ _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
+ }
+ for (int i = 0; i < content.length; i++)
+ {
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content[i], // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ false, // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
+
+ _protocolHandler.writeRequest(_channelId, methodBody);
}
- frames[0] = publishFrame;
- frames[1] = contentHeaderFrame;
- CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
- _protocolHandler.writeFrame(compositeFrame, wait);
-
if (message != origMessage)
{
@@ -627,36 +626,32 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param payload
* @return the array of content bodies
*/
- private ContentBody[] createContentBodies(ByteBuffer payload)
+ private Content[] createContent(ByteBuffer payload)
{
if (payload == null || payload.remaining() == 0)
{
- return NO_CONTENT_BODIES;
+ return NO_CONTENT;
}
- // we substract one from the total frame maximum size to account for the end of frame marker in a body frame
- // (0xCE byte).
int dataLength = payload.remaining();
- final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1;
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final ContentBody[] bodies = new ContentBody[frameCount];
+ final Content[] bodies = new Content[frameCount];
if (frameCount == 1)
{
- bodies[0] = new ContentBody();
- bodies[0].payload = payload;
+ bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array());
}
else
{
long remaining = dataLength;
for (int i = 0; i < bodies.length; i++)
{
- bodies[i] = new ContentBody();
payload.position((int) framePayloadMax * i);
int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
payload.limit(payload.position() + length);
- bodies[i].payload = payload.slice();
+ bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array());
remaining -= length;
}
}