diff options
| author | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-22 19:05:33 +0000 |
|---|---|---|
| committer | Arnaud Simon <arnaudsimon@apache.org> | 2007-11-22 19:05:33 +0000 |
| commit | 17beb4896a7092384f475ee85eb120c7e16a0b12 (patch) | |
| tree | a198fb4209fd9ce0efe342fe09fc29c27bd170d5 /java | |
| parent | f8122dafa6d2ccca5577c730b4b94ab016eef522 (diff) | |
| download | qpid-python-17beb4896a7092384f475ee85eb120c7e16a0b12.tar.gz | |
optimized message creation
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@597478 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java')
3 files changed, 108 insertions, 71 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java index df0feff201..4c45ec6638 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java @@ -456,7 +456,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac type = AMQDestination.UNKNOWN_TYPE; } - message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); + // message.getJmsHeaders().setInteger(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName(), type); sendMessage(destination, origMessage, message, deliveryMode, priority, timeToLive, mandatory, immediate, wait); diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java index 6033b96c0f..32665c2a24 100644 --- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java @@ -51,9 +51,7 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { ((AMQSession_0_10) getSession()).getQpidSession().exchangeDeclare(destination.getExchangeName().toString(), destination.getExchangeClass().toString(), - null, - null - ); + null, null); } //--- Overwritten methods @@ -66,97 +64,91 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer boolean wait) throws JMSException { message.prepareForSending(); - org.apache.qpidity.api.Message qpidityMessage = new ByteBufferMessage(); - if(_logger.isDebugEnabled()) + if (message.get010Message() == null) { - _logger.debug("Message Props: " + message.toString()); - } - try - { - if (message.getData() != null) + message.set010Message(new ByteBufferMessage()); + if (message.getData() == null) { - qpidityMessage.appendData(message.getData().buf()); - } - else - { - qpidityMessage.appendData(ByteBuffer.allocate(0)); + try + { + message.get010Message().appendData(ByteBuffer.allocate(0)); + } + catch (IOException e) + { + throw new JMSException(e.getMessage()); + } } } - catch (IOException e) - { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); - } + // set the delivery properties if (!_disableTimestamps) { final long currentTime = System.currentTimeMillis(); - qpidityMessage.getDeliveryProperties().setTimestamp(currentTime); + message.get010Message().getDeliveryProperties().setTimestamp(currentTime); if (timeToLive > 0) { - qpidityMessage.getDeliveryProperties().setExpiration(currentTime + timeToLive); + message.get010Message().getDeliveryProperties().setExpiration(currentTime + timeToLive); } else { - qpidityMessage.getDeliveryProperties().setExpiration(0); + message.get010Message().getDeliveryProperties().setExpiration(0); } + origMessage.setJMSTimestamp(message.get010Message().getDeliveryProperties().getTimestamp()); } - qpidityMessage.getDeliveryProperties().setDeliveryMode((byte) deliveryMode); - qpidityMessage.getDeliveryProperties().setPriority((byte) priority); - qpidityMessage.getDeliveryProperties().setExchange(destination.getExchangeName().toString()); - qpidityMessage.getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString()); + message.get010Message().getDeliveryProperties().setDeliveryMode((byte) deliveryMode); + message.get010Message().getDeliveryProperties().setPriority((byte) priority); + message.get010Message().getDeliveryProperties().setExchange(destination.getExchangeName().toString()); + message.get010Message().getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString()); + origMessage.setJMSPriority(message.get010Message().getDeliveryProperties().getPriority()); + origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration()); + origMessage.setJMSMessageID(message.getJMSMessageID()); + origMessage.setJMSDeliveryMode(deliveryMode); + BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties(); - // set the application properties - qpidityMessage.getMessageProperties().setContentType(contentHeaderProperties.getContentType().toString()); - AMQShortString type = contentHeaderProperties.getType(); - if( type != null ) + if (contentHeaderProperties.reset()) { - qpidityMessage.getMessageProperties().setType( type.toString()); - } - qpidityMessage.getMessageProperties().setMessageId(message.getJMSMessageID()) ; - AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); - if( correlationID != null ) - { - qpidityMessage.getMessageProperties().setCorrelationId(correlationID.toString()); - } - String replyToURL = contentHeaderProperties.getReplyToAsString(); - if (replyToURL != null) - { - AMQBindingURL dest; - try + // set the application properties + message.get010Message().getMessageProperties() + .setContentType(contentHeaderProperties.getContentType().toString()); + AMQShortString type = contentHeaderProperties.getType(); + if (type != null) { - dest = new AMQBindingURL(replyToURL); + message.get010Message().getMessageProperties().setType(type.toString()); } - catch (URLSyntaxException e) + message.get010Message().getMessageProperties().setMessageId(message.getJMSMessageID()); + AMQShortString correlationID = contentHeaderProperties.getCorrelationId(); + if (correlationID != null) { - throw ExceptionHelper.convertQpidExceptionToJMSException(e); + message.get010Message().getMessageProperties().setCorrelationId(correlationID.toString()); } - qpidityMessage.getMessageProperties() - .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); - } - if (contentHeaderProperties.getHeaders() != null) - { - //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix - contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); - qpidityMessage.getMessageProperties().setApplicationHeaders(FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders())); - for(String key:qpidityMessage.getMessageProperties().getApplicationHeaders().keySet()) + String replyToURL = contentHeaderProperties.getReplyToAsString(); + if (replyToURL != null) { - _logger.debug(key + "=" + qpidityMessage.getMessageProperties().getApplicationHeaders().get(key)); + AMQBindingURL dest; + try + { + dest = new AMQBindingURL(replyToURL); + } + catch (URLSyntaxException e) + { + throw ExceptionHelper.convertQpidExceptionToJMSException(e); + } + message.get010Message().getMessageProperties() + .setReplyTo(new ReplyTo(dest.getExchangeName().toString(), dest.getRoutingKey().toString())); + } + if (contentHeaderProperties.getHeaders() != null) + { + //JMS_QPID_DESTTYPE is always set but useles so this is a temporary fix + contentHeaderProperties.getHeaders().remove(CustomJMSXProperty.JMS_QPID_DESTTYPE.getShortStringName()); + message.get010Message().getMessageProperties() + .setApplicationHeaders(FiledTableSupport.convertToMap(contentHeaderProperties.getHeaders())); } } - if(_logger.isDebugEnabled() ) - { - _logger.debug("Updating original message"); - } - origMessage.setJMSPriority(qpidityMessage.getDeliveryProperties().getPriority()); - origMessage.setJMSTimestamp(qpidityMessage.getDeliveryProperties().getTimestamp()); - origMessage.setJMSExpiration(qpidityMessage.getDeliveryProperties().getExpiration()); - origMessage.setJMSMessageID(message.getJMSMessageID()); - origMessage.setJMSDeliveryMode(deliveryMode); // send the message try { ((AMQSession_0_10) getSession()).getQpidSession().messageTransfer(destination.getExchangeName().toString(), - qpidityMessage, + message.get010Message(), org.apache.qpidity.nclient.Session.TRANSFER_CONFIRM_MODE_NOT_REQUIRED, org.apache.qpidity.nclient.Session.TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE); } @@ -164,19 +156,19 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer { throw ExceptionHelper.convertQpidExceptionToJMSException(e); } - catch(RuntimeException rte) + catch (RuntimeException rte) { - JMSException ex = new JMSException("Exception when sending message"); + JMSException ex = new JMSException("Exception when sending message"); ex.setLinkedException(rte); throw ex; } - } public boolean isBound(AMQDestination destination) throws JMSException { - return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), destination.getRoutingKey()); + return _session.isQueueBound(destination.getExchangeName(), destination.getAMQQueueName(), + destination.getRoutingKey()); } } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java index 800f092b52..87fadd5e9b 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java @@ -42,6 +42,7 @@ import java.util.Collections; import java.util.Enumeration; import java.util.Map; import java.util.UUID; +import java.io.IOException; public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message { @@ -58,6 +59,48 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach private BasicMessageConsumer _consumer; private boolean _strictAMQP; + /** + * This is 0_10 specific + */ + private org.apache.qpidity.api.Message _010message = null; + + public void set010Message(org.apache.qpidity.api.Message m ) + { + _010message = m; + } + + public void dataChanged() + { + if (_010message != null) + { + _010message.clearData(); + try + { + if (_data != null) + { + _010message.appendData(_data.buf()); + } + else + { + _010message.appendData(java.nio.ByteBuffer.allocate(0)); + } + } + catch (IOException e) + { + throw new RuntimeException(e); + } + } + } + + /** + * End 010 specific + */ + + public org.apache.qpidity.api.Message get010Message() + { + return _010message; + } + protected AbstractJMSMessage(ByteBuffer data) { super(new BasicContentHeaderProperties()); @@ -652,6 +695,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach { throw new MessageNotWriteableException("You need to call clearProperties() to make the message writable"); } + _contentHeaderProperties.updated(); } public boolean isReadable() @@ -673,6 +717,7 @@ public abstract class AbstractJMSMessage extends AMQMessage implements org.apach else { _data.flip(); + dataChanged(); _changedData = false; } } |
