summaryrefslogtreecommitdiff
path: root/java/client
diff options
context:
space:
mode:
authorArnaud Simon <arnaudsimon@apache.org>2008-01-04 13:50:19 +0000
committerArnaud Simon <arnaudsimon@apache.org>2008-01-04 13:50:19 +0000
commitc0f40894de9c0b71afbf9515ab93ce2e55a4f8df (patch)
tree89761899563694ae6e96a99fb0d8e6ff7c2e8c87 /java/client
parentd66a9d83a468e1f25da0c0964d800d9a7db632ea (diff)
downloadqpid-python-c0f40894de9c0b71afbf9515ab93ce2e55a4f8df.tar.gz
cashed headers: see QPID-720
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@608840 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java51
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/Session.java2
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java12
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java10
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java9
-rw-r--r--java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java11
6 files changed, 80 insertions, 15 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 257d96bbe2..1551ca41ae 100644
--- a/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -27,6 +27,7 @@ import org.apache.qpid.url.URLSyntaxException;
import org.apache.qpidity.njms.ExceptionHelper;
import org.apache.qpidity.nclient.util.ByteBufferMessage;
import org.apache.qpidity.transport.ReplyTo;
+import org.apache.qpidity.transport.DeliveryProperties;
import javax.jms.Message;
import javax.jms.JMSException;
@@ -80,30 +81,56 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
}
}
+ DeliveryProperties deliveryProp = message.get010Message().getDeliveryProperties();
// set the delivery properties
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();
- message.get010Message().getDeliveryProperties().setTimestamp(currentTime);
+ deliveryProp.setTimestamp(currentTime);
if (timeToLive > 0)
{
- message.get010Message().getDeliveryProperties().setExpiration(currentTime + timeToLive);
+ deliveryProp.setExpiration(currentTime + timeToLive);
+ message.setJMSExpiration(currentTime + timeToLive);
}
else
{
- message.get010Message().getDeliveryProperties().setExpiration(0);
+ deliveryProp.setExpiration(0);
+ message.setJMSExpiration(0);
}
- origMessage.setJMSTimestamp(message.get010Message().getDeliveryProperties().getTimestamp());
+ message.setJMSTimestamp(currentTime);
}
- message.get010Message().getDeliveryProperties().setDeliveryMode((byte) deliveryMode);
- message.get010Message().getDeliveryProperties().setPriority((byte) priority);
- message.get010Message().getDeliveryProperties().setExchange(destination.getExchangeName().toString());
- message.get010Message().getDeliveryProperties().setRoutingKey(destination.getRoutingKey().toString());
- origMessage.setJMSPriority(message.get010Message().getDeliveryProperties().getPriority());
- origMessage.setJMSExpiration(message.get010Message().getDeliveryProperties().getExpiration());
- origMessage.setJMSMessageID(message.getJMSMessageID());
- origMessage.setJMSDeliveryMode(deliveryMode);
+ if (deliveryProp.getDeliveryMode() != deliveryMode)
+ {
+ deliveryProp.setDeliveryMode((byte) deliveryMode);
+ message.setJMSDeliveryMode(deliveryMode);
+ }
+ if (deliveryProp.getPriority() != priority)
+ {
+ deliveryProp.setPriority((byte) priority);
+ message.setJMSPriority(priority);
+ }
+ String excahngeName = destination.getExchangeName().toString();
+ if ( deliveryProp.getExchange() == null || ! deliveryProp.getExchange().equals(excahngeName))
+ {
+ deliveryProp.setExchange(excahngeName);
+ }
+ String routingKey = destination.getRoutingKey().toString();
+ if (deliveryProp.getRoutingKey() == null || ! deliveryProp.getRoutingKey().equals(routingKey))
+ {
+ deliveryProp.setRoutingKey(routingKey);
+ }
+
+ if (message != origMessage)
+ {
+ _logger.debug("Updating original message");
+ origMessage.setJMSPriority(message.getJMSPriority());
+ origMessage.setJMSTimestamp(message.getJMSTimestamp());
+ _logger.debug("Setting JMSExpiration:" + message.getJMSExpiration());
+ origMessage.setJMSExpiration(message.getJMSExpiration());
+ origMessage.setJMSMessageID(message.getJMSMessageID());
+ origMessage.setJMSDeliveryMode(deliveryMode);
+ }
BasicContentHeaderProperties contentHeaderProperties = message.getContentHeaderProperties();
if (contentHeaderProperties.reset())
{
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
index 76735f8925..3707807f70 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
@@ -186,7 +186,7 @@ public interface Session
* @see org.apache.qpidity.transport.DeliveryProperties
* @see org.apache.qpidity.transport.MessageProperties
*/
- public void header(Struct... headers);
+ public Header header(Struct... headers);
/**
* Add the following byte array to the content of the message being sent.
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
index 34f902061e..d9434419da 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
@@ -74,7 +74,17 @@ public class ClientSession extends org.apache.qpidity.transport.Session implemen
// therefore reading the content in one shot.
ByteBuffer data = msg.readData();
super.messageTransfer(destination, confirmMode, acquireMode);
- super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+ // super.header(msg.getDeliveryProperties(),msg.getMessageProperties() );
+ if( msg.getHeader() == null || msg.getDeliveryProperties().isDirty() || msg.getMessageProperties().isDirty() )
+ {
+ msg.setHeader( super.header(msg.getDeliveryProperties(),msg.getMessageProperties()) );
+ msg.getDeliveryProperties().setDirty(false);
+ msg.getMessageProperties().setDirty(false);
+ }
+ else
+ {
+ super.header(msg.getHeader());
+ }
data( data );
endData();
}
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
index 5e87d63966..56443e2aeb 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
@@ -7,6 +7,7 @@ import java.util.Queue;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
/**
@@ -27,6 +28,15 @@ public class ByteBufferMessage implements Message
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
private long _transferId;
+ private Header _header;
+
+ public void setHeader(Header header) {
+ _header = header;
+ }
+
+ public Header getHeader() {
+ return _header;
+ }
public ByteBufferMessage()
{
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
index 6d925a0ad3..308a16ce36 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
@@ -9,6 +9,7 @@ import java.nio.channels.FileChannel;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
/**
@@ -52,6 +53,14 @@ public class FileMessage extends ReadOnlyMessage implements Message
}
}
+ public void setHeader(Header header) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Header getHeader() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void readData(byte[] target) throws IOException
{
throw new UnsupportedOperationException();
diff --git a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
index 2422de3877..fd3e812cbc 100644
--- a/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
+++ b/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
@@ -7,6 +7,7 @@ import java.nio.channels.SocketChannel;
import org.apache.qpidity.transport.DeliveryProperties;
import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
import org.apache.qpidity.api.Message;
public class StreamingMessage extends ReadOnlyMessage implements Message
@@ -14,7 +15,15 @@ public class StreamingMessage extends ReadOnlyMessage implements Message
SocketChannel _socChannel;
private int _chunkSize;
private ByteBuffer _readBuf;
-
+
+ public Header getHeader() {
+ return null; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setHeader(Header header) {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
{
_messageProperties = messageProperties;