diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-19 18:52:08 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2007-01-19 18:52:08 +0000 |
| commit | 22875e2e7f7128b0a2bbe38bd3932656912d725e (patch) | |
| tree | 8b662246e6f11749f4a980a887c92ebb5bd13510 /java/client/src | |
| parent | df04f9dcf9d869f146923ac150b7110b1224f690 (diff) | |
| download | qpid-python-22875e2e7f7128b0a2bbe38bd3932656912d725e.tar.gz | |
added logic to handle the Reference case when transfering large messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
| -rw-r--r-- | java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java | 166 |
1 files changed, 102 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index c585a66c58..505491f9df 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -32,6 +32,9 @@ import org.apache.qpid.framing.*; import javax.jms.*; import java.io.UnsupportedEncodingException; import java.util.Enumeration; +import java.util.Iterator; +import java.util.LinkedList; +import java.util.List; public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer { @@ -559,49 +562,43 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } int size = (payload != null) ? payload.limit() : 0; - Content[] content = createContent(payload); - if (content.length > 0 && _logger.isDebugEnabled()) - { - _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination); - } - for (int i = 0; i < content.length; i++) - { - try - { - AMQMethodBody methodBody = MessageTransferBody.createMethodBody( - (byte)0, (byte)9, // AMQP version (major, minor) - messageHeaders.getAppId(), // String appId - messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders - content[i], // Content body - messageHeaders.getEncoding(), // String contentEncoding - messageHeaders.getContentType(), // String contentType - messageHeaders.getCorrelationId(), // String correlationId - (short)deliveryMode, // short deliveryMode - messageHeaders.getDestination(), // String destination - destination.getExchangeName(), // String exchange - messageHeaders.getExpiration(), // long expiration - immediate, // boolean immediate - messageHeaders.getMessageId(), // String messageId - (short)priority, // short priority - message.getJMSRedelivered(), // boolean redelivered - messageHeaders.getReplyTo(), // String replyTo - destination.getRoutingKey(), // String routingKey - new String("abc123").getBytes(), // byte[] securityToken - 0, // int ticket - messageHeaders.getTimestamp(), // long timestamp - messageHeaders.getTransactionId(), // String transactionId - timeToLive, // long ttl - messageHeaders.getUserId()); // String userId + final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize(); - _protocolHandler.writeRequest(_channelId, methodBody); - } - catch (AMQException e) - { - throw new JMSException(e.toString()); - } - } - - + if (size < framePayloadMax){ + // Inline message case + _logger.debug("Inline case, sending data inline with the transfer method"); + Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE,payload.array()); + doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate); + } else { + // Reference message case + // Sequence is as follows + // 1. Message.open + // 2. Message.Transfer + // 3. "n" of Message.append + // 4. Message.close + List content = createContent(payload); + if(_logger.isDebugEnabled()) + { + _logger.debug("Reference case, sending data as chunks"); + _logger.debug("Sending " + content.size() + " Message.Transfer frames to " + destination); + } + // Message.Open + String referenceId = generateReferenceId(); + doMessageOpen(referenceId); + + // Message.Transfer + Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_REFERENCE,referenceId.getBytes()); + doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate); + + //Message.Append + for(Iterator it = content.iterator(); it.hasNext();){ + doMessageAppend(referenceId,(byte[])it.next()); + } + + //Message.Close + doMessageClose(referenceId); + } + if (message != origMessage) { _logger.warn("Updating original message"); @@ -612,6 +609,59 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j origMessage.setJMSMessageID(message.getJMSMessageID()); } } + + private void doMessageTransfer(MessageHeaders messageHeaders,AMQDestination destination, Content content, AbstractJMSMessage message, int deliveryMode, int priority, + long timeToLive, boolean immediate)throws JMSException{ + try + { + AMQMethodBody methodBody = MessageTransferBody.createMethodBody( + (byte)0, (byte)9, // AMQP version (major, minor) + messageHeaders.getAppId(), // String appId + messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders + content, // Content body + messageHeaders.getEncoding(), // String contentEncoding + messageHeaders.getContentType(), // String contentType + messageHeaders.getCorrelationId(), // String correlationId + (short)deliveryMode, // short deliveryMode + messageHeaders.getDestination(), // String destination + destination.getExchangeName(), // String exchange + messageHeaders.getExpiration(), // long expiration + immediate, // boolean immediate + messageHeaders.getMessageId(), // String messageId + (short)priority, // short priority + message.getJMSRedelivered(), // boolean redelivered + messageHeaders.getReplyTo(), // String replyTo + destination.getRoutingKey(), // String routingKey + new String("abc123").getBytes(), // byte[] securityToken + 0, // int ticket + messageHeaders.getTimestamp(), // long timestamp + messageHeaders.getTransactionId(), // String transactionId + timeToLive, // long ttl + messageHeaders.getUserId()); // String userId + + _protocolHandler.writeRequest(_channelId, methodBody); + } + catch (AMQException e) + { + throw new JMSException(e.toString()); + } + } + + private void doMessageOpen(String referenceId){ + AMQMethodBody methodBody = MessageOpenBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes()); + } + + private void doMessageAppend(String referenceId,byte[] data){ + AMQMethodBody methodBody = MessageAppendBody.createMethodBody((byte)0, (byte)9, data, referenceId.getBytes()); + } + + private void doMessageClose(String referenceId){ + AMQMethodBody methodBody = MessageCloseBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes()); + } + + private String generateReferenceId(){ + return String.valueOf(System.currentTimeMillis()); + } private void checkTemporaryDestination(AMQDestination destination) throws JMSException { @@ -631,7 +681,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j } } } - + /** * Create content bodies. This will split a large message into numerous bodies depending on the negotiated * maximum frame size. @@ -639,34 +689,22 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j * @param payload * @return the array of content bodies */ - private Content[] createContent(ByteBuffer payload) + private List createContent(ByteBuffer payload) { - if (payload == null || payload.remaining() == 0) - { - return NO_CONTENT; - } - int dataLength = payload.remaining(); final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize(); int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0; int frameCount = (int) (dataLength / framePayloadMax) + lastFrame; - final Content[] bodies = new Content[frameCount]; + List bodies = new LinkedList(); - if (frameCount == 1) + long remaining = dataLength; + for (int i = 0; i < frameCount + lastFrame; i++) { - bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload); - } - else - { - long remaining = dataLength; - for (int i = 0; i < bodies.length; i++) - { - payload.position((int) framePayloadMax * i); - int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; - payload.limit(payload.position() + length); - bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice()); - remaining -= length; - } + payload.position((int) framePayloadMax * i); + int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining; + payload.limit(payload.position() + length); + bodies.add(payload.slice().array()); + remaining -= length; } return bodies; } |
