summaryrefslogtreecommitdiff
path: root/java
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2007-11-22 19:05:33 +0000
committerArnaud Simon <arnaudsimon@apache.org>2007-11-22 19:05:33 +0000
commit17beb4896a7092384f475ee85eb120c7e16a0b12 (patch)
treea198fb4209fd9ce0efe342fe09fc29c27bd170d5 /java
parentf8122dafa6d2ccca5577c730b4b94ab016eef522 (diff)
downloadqpid-python-17beb4896a7092384f475ee85eb120c7e16a0b12.tar.gz
optimized message creation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@597478 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java2
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java132
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java45
3 files changed, 108 insertions, 71 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index df0feff201..4c45ec6638 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -456,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
type = AMQDestination.UNKNOWN_TYPE;
}
- message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
+ // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type);
sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive,
mandatory, immediate, wait);
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 6033b96c0f..32665c2a24 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -51,9 +51,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(),
destination.getExchangeClass().toString(),
- null,
- null
- );
+ null, null);
}
//--- Overwritten methods
@@ -66,97 +64,91 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
boolean wait) throws JMSException
{
message.prepareForSending();
- org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage();
- if(_logger.isDebugEnabled())
+ if (message.get010Message() == null)
{
- _logger.debug("Message Props: " + message.toString());
- }
- try
- {
- if (message.getData() != null)
+ message.set010Message(new ByteBufferMessage());
+ if (message.getData() == null)
{
- qpidityMessage.appendData(message.getData().buf());
- }
- else
- {
- qpidityMessage.appendData(ByteBuffer.allocate(0));
+ try
+ {
+ message.get010Message().appendData(ByteBuffer.allocate(0));
+ }
+ catch (IOException e)
+ {
+ throw new JMSException(e.getMessage());
+ }
}
}
- catch (IOException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+
// set the delivery properties
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
- qpidityMessage.getDeliveryProperties().setTimestamp(currentTime);
+ message.get010Message().getDeliveryProperties().setTimestamp(currentTime);
if (timeToLive > 0)
{
- qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive);
+ message.get010Message().getDeliveryProperties().setExpiration(currentTime + timeToLive);
}
else
{
- qpidityMessage.getDeliveryProperties().setExpiration(0);
+ message.get010Message().getDeliveryProperties().setExpiration(0);
}
+ origMessage.setJMSTimestamp(message.get010Message().getDeliveryProperties().getTimestamp());
}
- qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
- qpidityMessage.getDeliveryProperties().setPriority((byte) priority);
- qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString());
- qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
+ message.get010Message().getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
+ message.get010Message().getDeliveryProperties().setPriority((byte) priority);
+ message.get010Message().getDeliveryProperties().setExchange(destination.getExchangeName().toString());
+ message.get010Message().getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
+ origMessage.setJMSPriority(message.get010Message().getDeliveryProperties().getPriority());
+ origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ origMessage.setJMSDeliveryMode(deliveryMode);
+
BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
- // set the application properties
- qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString());
- AMQShortString type = contentHeaderProperties.getType();
- if( type != null )
+ if (contentHeaderProperties.reset())
{
- qpidityMessage.getMessageProperties().setType( type.toString());
- }
- qpidityMessage.getMessageProperties().setMessageId(message.getJMSMessageID()) ;
- AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
- if( correlationID != null )
- {
- qpidityMessage.getMessageProperties().setCorrelationId(correlationID.toString());
- }
- String replyToURL = contentHeaderProperties.getReplyToAsString();
- if (replyToURL != null)
- {
- AMQBindingURL dest;
- try
+ // set the application properties
+ message.get010Message().getMessageProperties()
+ .setContentType(contentHeaderProperties.getContentType().toString());
+ AMQShortString type = contentHeaderProperties.getType();
+ if (type != null)
{
- dest = new AMQBindingURL(replyToURL);
+ message.get010Message().getMessageProperties().setType(type.toString());
}
- catch (URLSyntaxException e)
+ message.get010Message().getMessageProperties().setMessageId(message.getJMSMessageID());
+ AMQShortString correlationID = contentHeaderProperties.getCorrelationId();
+ if (correlationID != null)
{
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ message.get010Message().getMessageProperties().setCorrelationId(correlationID.toString());
}
- qpidityMessage.getMessageProperties()
- .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
- }
- if (contentHeaderProperties.getHeaders() != null)
- {
- //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix
- contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
- qpidityMessage.getMessageProperties().setApplicationHeaders(FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders()));
- for(String key:qpidityMessage.getMessageProperties().getApplicationHeaders().keySet())
+ String replyToURL = contentHeaderProperties.getReplyToAsString();
+ if (replyToURL != null)
{
- _logger.debug(key + "=" + qpidityMessage.getMessageProperties().getApplicationHeaders().get(key));
+ AMQBindingURL dest;
+ try
+ {
+ dest = new AMQBindingURL(replyToURL);
+ }
+ catch (URLSyntaxException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ message.get010Message().getMessageProperties()
+ .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString()));
+ }
+ if (contentHeaderProperties.getHeaders() != null)
+ {
+ //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix
+ contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName());
+ message.get010Message().getMessageProperties()
+ .setApplicationHeaders(FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders()));
}
}
- if(_logger.isDebugEnabled() )
- {
- _logger.debug("Updating original message");
- }
- origMessage.setJMSPriority(qpidityMessage.getDeliveryProperties().getPriority());
- origMessage.setJMSTimestamp(qpidityMessage.getDeliveryProperties().getTimestamp());
- origMessage.setJMSExpiration(qpidityMessage.getDeliveryProperties().getExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
- origMessage.setJMSDeliveryMode(deliveryMode);
// send the message
try
{
((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(),
- qpidityMessage,
+ message.get010Message(),
org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED,
org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE);
}
@@ -164,19 +156,19 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
throw ExceptionHelper.convertQpidExceptionToJMSException(e);
}
- catch(RuntimeException rte)
+ catch (RuntimeException rte)
{
- JMSException ex = new JMSException("Exception when sending message");
+ JMSException ex = new JMSException("Exception when sending message");
ex.setLinkedException(rte);
throw ex;
}
-
}
public boolean isBound(AMQDestination destination) throws JMSException
{
- return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), destination.getRoutingKey());
+ return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(),
+ destination.getRoutingKey());
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
index 800f092b52..87fadd5e9b 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
@@ -42,6 +42,7 @@ import java.util.Collections;
import java.util.Enumeration;
import java.util.Map;
import java.util.UUID;
+import java.io.IOException;
public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
{
@@ -58,6 +59,48 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
private BasicMessageConsumer _consumer;
private boolean _strictAMQP;
+ /**
+ * This is 0_10 specific
+ */
+ private org.apache.qpidity.api.Message _010message = null;
+
+ public void set010Message(org.apache.qpidity.api.Message m )
+ {
+ _010message = m;
+ }
+
+ public void dataChanged()
+ {
+ if (_010message != null)
+ {
+ _010message.clearData();
+ try
+ {
+ if (_data != null)
+ {
+ _010message.appendData(_data.buf());
+ }
+ else
+ {
+ _010message.appendData(java.nio.ByteBuffer.allocate(0));
+ }
+ }
+ catch (IOException e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+ }
+
+ /**
+ * End 010 specific
+ */
+
+ public org.apache.qpidity.api.Message get010Message()
+ {
+ return _010message;
+ }
+
protected AbstractJMSMessage(ByteBuffer data)
{
super(new BasicContentHeaderProperties());
@@ -652,6 +695,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
{
throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable");
}
+ _contentHeaderProperties.updated();
}
public boolean isReadable()
@@ -673,6 +717,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach
else
{
_data.flip();
+ dataChanged();
_changedData = false;
}
}