diff options
| author | Kim van der Riet <kpvdr@apache.org> | 2007-01-17 19:49:40 +0000 |
|---|---|---|
| committer | Kim van der Riet <kpvdr@apache.org> | 2007-01-17 19:49:40 +0000 |
| commit | 844e632c013affdfd028704a195a746f6973ebb2 (patch) | |
| tree | 2a2fe63ad432e9dbeae557e5e30dc51654b9cdca /java/client | |
| parent | e6998705ab84f21e92595b0186958516165476f8 (diff) | |
| download | qpid-python-844e632c013affdfd028704a195a746f6973ebb2.tar.gz | |
Conversion of client/BasicMessageProducer to new Message class
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497138 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 101 |
1 files changed, 48 insertions, 53 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index 1d2ca35016..391ce87ae4 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -25,6 +25,7 @@ import org.apache.mina.common.ByteBuffer; import org.apache.qpid.AMQException; import org.apache.qpid.client.message.AbstractJMSMessage; import org.apache.qpid.client.message.JMSBytesMessage; +import org.apache.qpid.client.message.MessageHeaders; import org.apache.qpid.client.protocol.AMQProtocolHandler; import org.apache.qpid.framing.*; @@ -100,7 +101,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j private final boolean _mandatory; private final boolean _waitUntilSent; - private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0]; + private static final Content[] NO_CONTENT = new Content[0]; protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId, AMQSession session, AMQProtocolHandler protocolHandler, @@ -515,24 +516,14 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @throws JMSException */ protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, - long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException + long timeToLive, boolean mandatory, boolean immediate, boolean wait) + throws JMSException { checkTemporaryDestination(destination); origMessage.setJMSDestination(destination); - AbstractJMSMessage message = convertToNativeMessage(origMessage); message.getMessageHeaders().getJMSHeaders().setString(CustomJMXProperty.JMSX_QPID_JMSDESTINATIONURL.toString(), destination.toURL()); - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - // Be aware of possible changes to parameter order as versions change. - AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, - (byte)0, (byte)9, // AMQP version (major, minor) - destination.getExchangeName(), // exchange - immediate, // immediate - mandatory, // mandatory - destination.getRoutingKey(), // routingKey - 0); // ticket long currentTime = 0; if (!_disableTimestamps) @@ -542,53 +533,61 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } message.prepareForSending(); ByteBuffer payload = message.getData(); - BasicContentHeaderProperties contentHeaderProperties = message.getMessageHeaders(); + MessageHeaders messageHeaders = message.getMessageHeaders(); if (timeToLive > 0) { if (!_disableTimestamps) { - contentHeaderProperties.setExpiration(currentTime + timeToLive); + messageHeaders.setExpiration(currentTime + timeToLive); } } else { if (!_disableTimestamps) { - contentHeaderProperties.setExpiration(0); + messageHeaders.setExpiration(0); } } - contentHeaderProperties.setDeliveryMode((byte) deliveryMode); - contentHeaderProperties.setPriority((byte) priority); +// messageHeaders.setDeliveryMode((byte) deliveryMode); +// messageHeaders.setPriority((byte) priority); int size = (payload != null) ? payload.limit() : 0; - ContentBody[] contentBodies = createContentBodies(payload); - AMQFrame[] frames = new AMQFrame[2 + contentBodies.length]; - for (int i = 0; i < contentBodies.length; i++) - { - frames[2 + i] = ContentBody.createAMQFrame(_channelId, contentBodies[i]); - } - if (contentBodies.length > 0 && _logger.isDebugEnabled()) - { - _logger.debug("Sending content body frames to " + destination); - } - - // weight argument of zero indicates no child content headers, just bodies - // AMQP version change: Hardwire the version to 0-9 (major=0, minor=9) - // TODO: Connect this to the session version obtained from ProtocolInitiation for this session. - AMQFrame contentHeaderFrame = ContentHeaderBody.createAMQFrame(_channelId, BasicConsumeBody.getClazz((byte)0, (byte)9), 0, - contentHeaderProperties, - size); - if (_logger.isDebugEnabled()) - { - _logger.debug("Sending content header frame to " + destination); + Content[] content = createContent(payload); + if (content.length > 0 && _logger.isDebugEnabled()) + { + _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination); + } + for (int i = 0; i < content.length; i++) + { + AMQMethodBody methodBody = MessageTransferBody.createMethodBody( + (byte)0, (byte)9, // AMQP version (major, minor) + messageHeaders.getAppId(), // String appId + messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders + content[i], // Content body + messageHeaders.getEncoding(), // String contentEncoding + messageHeaders.getContentType(), // String contentType + messageHeaders.getCorrelationId(), // String correlationId + (short)deliveryMode, // short deliveryMode + messageHeaders.getDestination(), // String destination + destination.getExchangeName(), // String exchange + messageHeaders.getExpiration(), // long expiration + immediate, // boolean immediate + messageHeaders.getMessageId(), // String messageId + (short)priority, // short priority + false, // boolean redelivered + messageHeaders.getReplyTo(), // String replyTo + destination.getRoutingKey(), // String routingKey + new String("abc123").getBytes(), // byte[] securityToken + 0, // int ticket + messageHeaders.getTimestamp(), // long timestamp + messageHeaders.getTransactionId(), // String transactionId + timeToLive, // long ttl + messageHeaders.getUserId()); // String userId + + _protocolHandler.writeRequest(_channelId, methodBody); } - frames[0] = publishFrame; - frames[1] = contentHeaderFrame; - CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames); - _protocolHandler.writeFrame(compositeFrame, wait); - if (message != origMessage) { @@ -627,36 +626,32 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param payload * @return the array of content bodies */ - private ContentBody[] createContentBodies(ByteBuffer payload) + private Content[] createContent(ByteBuffer payload) { if (payload == null || payload.remaining() == 0) { - return NO_CONTENT_BODIES; + return NO_CONTENT; } - // we substract one from the total frame maximum size to account for the end of frame marker in a body frame - // (0xCE byte). int dataLength = payload.remaining(); - final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize() - 1; + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize(); int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - final ContentBody[] bodies = new ContentBody[frameCount]; + final Content[] bodies = new Content[frameCount]; if (frameCount == 1) { - bodies[0] = new ContentBody(); - bodies[0].payload = payload; + bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.array()); } else { long remaining = dataLength; for (int i = 0; i < bodies.length; i++) { - bodies[i] = new ContentBody(); payload.position((int) framePayloadMax * i); int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; payload.limit(payload.position() + length); - bodies[i].payload = payload.slice(); + bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice().array()); remaining -= length; } } |
