summaryrefslogtreecommitdiff
path: root/qpid/java/client
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/java/client')
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java3
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java5
-rw-r--r--qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java2
3 files changed, 10 insertions, 0 deletions
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
index beaa47ed1e..954a3bc28f 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
@@ -116,6 +116,8 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
private boolean _disableMessageId;
private UUIDGen _messageIdGenerator = UUIDs.newGenerator();
+
+ protected String _userID; // ref user id used in the connection.
private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
@@ -138,6 +140,7 @@ public abstract class BasicMessageProducer extends Closeable implements org.apac
_immediate = immediate;
_mandatory = mandatory;
_waitUntilSent = waitUntilSent;
+ _userID = connection.getUsername();
}
void resubscribe() throws AMQException
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
index 02c5526e03..2810b37d48 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_10.java
@@ -77,6 +77,9 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
DeliveryProperties deliveryProp = delegate.getDeliveryProperties();
MessageProperties messageProps = delegate.getMessageProperties();
+ // On the receiving side, this will be read in to the JMSXUserID as well.
+ messageProps.setUserId(_userID.getBytes());
+
if (messageId != null)
{
messageProps.setMessageId(messageId);
@@ -159,6 +162,8 @@ public class BasicMessageProducer_0_10 extends BasicMessageProducer
{
ssn.sync();
}
+
+
}
catch (RuntimeException rte)
{
diff --git a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index c547fcb488..048065eac9 100644
--- a/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -86,6 +86,8 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
AMQMessageDelegate_0_8 delegate = (AMQMessageDelegate_0_8) message.getDelegate();
BasicContentHeaderProperties contentHeaderProperties = delegate.getContentHeaderProperties();
+ contentHeaderProperties.setUserId(_userID);
+
if (!_disableTimestamps)
{
final long currentTime = System.currentTimeMillis();