summaryrefslogtreecommitdiff
path: root/java/client/src
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2007-01-19 18:52:08 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2007-01-19 18:52:08 +0000
commit22875e2e7f7128b0a2bbe38bd3932656912d725e (patch)
tree8b662246e6f11749f4a980a887c92ebb5bd13510 /java/client/src
parentdf04f9dcf9d869f146923ac150b7110b1224f690 (diff)
downloadqpid-python-22875e2e7f7128b0a2bbe38bd3932656912d725e.tar.gz
added logic to handle the Reference case when transfering large messages
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-9@497906 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java166
1 files changed, 102 insertions, 64 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index c585a66c58..505491f9df 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -32,6 +32,9 @@ import org.apache.qpid.framing.*;
import javax.jms.*;
import java.io.UnsupportedEncodingException;
import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
{
@@ -559,49 +562,43 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
int size = (payload != null) ? payload.limit() : 0;
- Content[] content = createContent(payload);
- if (content.length > 0 && _logger.isDebugEnabled())
- {
- _logger.debug("Sending " + content.length + " Message.Transfer frames to " + destination);
- }
- for (int i = 0; i < content.length; i++)
- {
- try
- {
- AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
- (byte)0, (byte)9, // AMQP version (major, minor)
- messageHeaders.getAppId(), // String appId
- messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
- content[i], // Content body
- messageHeaders.getEncoding(), // String contentEncoding
- messageHeaders.getContentType(), // String contentType
- messageHeaders.getCorrelationId(), // String correlationId
- (short)deliveryMode, // short deliveryMode
- messageHeaders.getDestination(), // String destination
- destination.getExchangeName(), // String exchange
- messageHeaders.getExpiration(), // long expiration
- immediate, // boolean immediate
- messageHeaders.getMessageId(), // String messageId
- (short)priority, // short priority
- message.getJMSRedelivered(), // boolean redelivered
- messageHeaders.getReplyTo(), // String replyTo
- destination.getRoutingKey(), // String routingKey
- new String("abc123").getBytes(), // byte[] securityToken
- 0, // int ticket
- messageHeaders.getTimestamp(), // long timestamp
- messageHeaders.getTransactionId(), // String transactionId
- timeToLive, // long ttl
- messageHeaders.getUserId()); // String userId
+ final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
- _protocolHandler.writeRequest(_channelId, methodBody);
- }
- catch (AMQException e)
- {
- throw new JMSException(e.toString());
- }
- }
-
-
+ if (size < framePayloadMax){
+ // Inline message case
+ _logger.debug("Inline case, sending data inline with the transfer method");
+ Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE,payload.array());
+ doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+ } else {
+ // Reference message case
+ // Sequence is as follows
+ // 1. Message.open
+ // 2. Message.Transfer
+ // 3. "n" of Message.append
+ // 4. Message.close
+ List content = createContent(payload);
+ if(_logger.isDebugEnabled())
+ {
+ _logger.debug("Reference case, sending data as chunks");
+ _logger.debug("Sending " + content.size() + " Message.Transfer frames to " + destination);
+ }
+ // Message.Open
+ String referenceId = generateReferenceId();
+ doMessageOpen(referenceId);
+
+ // Message.Transfer
+ Content data = new Content(Content.ContentTypeEnum.CONTENT_TYPE_REFERENCE,referenceId.getBytes());
+ doMessageTransfer(messageHeaders,destination,data,message,deliveryMode,priority,timeToLive,immediate);
+
+ //Message.Append
+ for(Iterator it = content.iterator(); it.hasNext();){
+ doMessageAppend(referenceId,(byte[])it.next());
+ }
+
+ //Message.Close
+ doMessageClose(referenceId);
+ }
+
if (message != origMessage)
{
_logger.warn("Updating original message");
@@ -612,6 +609,59 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
origMessage.setJMSMessageID(message.getJMSMessageID());
}
}
+
+ private void doMessageTransfer(MessageHeaders messageHeaders,AMQDestination destination, Content content, AbstractJMSMessage message, int deliveryMode, int priority,
+ long timeToLive, boolean immediate)throws JMSException{
+ try
+ {
+ AMQMethodBody methodBody = MessageTransferBody.createMethodBody(
+ (byte)0, (byte)9, // AMQP version (major, minor)
+ messageHeaders.getAppId(), // String appId
+ messageHeaders.getJMSHeaders(), // FieldTable applicationHeaders
+ content, // Content body
+ messageHeaders.getEncoding(), // String contentEncoding
+ messageHeaders.getContentType(), // String contentType
+ messageHeaders.getCorrelationId(), // String correlationId
+ (short)deliveryMode, // short deliveryMode
+ messageHeaders.getDestination(), // String destination
+ destination.getExchangeName(), // String exchange
+ messageHeaders.getExpiration(), // long expiration
+ immediate, // boolean immediate
+ messageHeaders.getMessageId(), // String messageId
+ (short)priority, // short priority
+ message.getJMSRedelivered(), // boolean redelivered
+ messageHeaders.getReplyTo(), // String replyTo
+ destination.getRoutingKey(), // String routingKey
+ new String("abc123").getBytes(), // byte[] securityToken
+ 0, // int ticket
+ messageHeaders.getTimestamp(), // long timestamp
+ messageHeaders.getTransactionId(), // String transactionId
+ timeToLive, // long ttl
+ messageHeaders.getUserId()); // String userId
+
+ _protocolHandler.writeRequest(_channelId, methodBody);
+ }
+ catch (AMQException e)
+ {
+ throw new JMSException(e.toString());
+ }
+ }
+
+ private void doMessageOpen(String referenceId){
+ AMQMethodBody methodBody = MessageOpenBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+ }
+
+ private void doMessageAppend(String referenceId,byte[] data){
+ AMQMethodBody methodBody = MessageAppendBody.createMethodBody((byte)0, (byte)9, data, referenceId.getBytes());
+ }
+
+ private void doMessageClose(String referenceId){
+ AMQMethodBody methodBody = MessageCloseBody.createMethodBody((byte)0, (byte)9, referenceId.getBytes());
+ }
+
+ private String generateReferenceId(){
+ return String.valueOf(System.currentTimeMillis());
+ }
private void checkTemporaryDestination(AMQDestination destination) throws JMSException
{
@@ -631,7 +681,7 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
}
}
}
-
+
/**
* Create content bodies. This will split a large message into numerous bodies depending on the negotiated
* maximum frame size.
@@ -639,34 +689,22 @@ public class BasicMessageProducer extends Closeable implements org.apache.qpid.j
* @param payload
* @return the array of content bodies
*/
- private Content[] createContent(ByteBuffer payload)
+ private List createContent(ByteBuffer payload)
{
- if (payload == null || payload.remaining() == 0)
- {
- return NO_CONTENT;
- }
-
int dataLength = payload.remaining();
final long framePayloadMax = _session.getAMQConnection().getMaximumFrameSize();
int lastFrame = (dataLength % framePayloadMax) > 0 ? 1 : 0;
int frameCount = (int) (dataLength / framePayloadMax) + lastFrame;
- final Content[] bodies = new Content[frameCount];
+ List bodies = new LinkedList();
- if (frameCount == 1)
+ long remaining = dataLength;
+ for (int i = 0; i < frameCount + lastFrame; i++)
{
- bodies[0] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload);
- }
- else
- {
- long remaining = dataLength;
- for (int i = 0; i < bodies.length; i++)
- {
- payload.position((int) framePayloadMax * i);
- int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
- payload.limit(payload.position() + length);
- bodies[i] = new Content(Content.ContentTypeEnum.CONTENT_TYPE_INLINE, payload.slice());
- remaining -= length;
- }
+ payload.position((int) framePayloadMax * i);
+ int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
+ payload.limit(payload.position() + length);
+ bodies.add(payload.slice().array());
+ remaining -= length;
}
return bodies;
}