summaryrefslogtreecommitdiff
path: root/java/client/src/main
diff options
context:
space:
mode:
authorRajith Muditha Attapattu <rajith@apache.org>2010-07-08 18:27:01 +0000
committerRajith Muditha Attapattu <rajith@apache.org>2010-07-08 18:27:01 +0000
commit555b22988d27b1b6fb66280905d7c71becb59c43 (patch)
tree6e45b93be4c279a8aa7519aabae1eb38571e5a77 /java/client/src/main
parentd4a39c33f6f0e3d88a385cdbf0eb942a60ceb781 (diff)
downloadqpid-python-555b22988d27b1b6fb66280905d7c71becb59c43.tar.gz
This is a fix for QPID-2723
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@961866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src/main')
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession.java24
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java6
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java32
-rw-r--r--java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java5
4 files changed, 53 insertions, 14 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
index 3390dfef8b..10ee8a8a0d 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
@@ -549,7 +549,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public BytesMessage createBytesMessage() throws JMSException
{
checkNotClosed();
- return new JMSBytesMessage(getMessageDelegateFactory());
+ JMSBytesMessage msg = new JMSBytesMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
/**
@@ -1014,11 +1016,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
checkNotClosed();
if (USE_AMQP_ENCODED_MAP_MESSAGE)
{
- return new AMQPEncodedMapMessage(getMessageDelegateFactory());
+ AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
else
{
- return new JMSMapMessage(getMessageDelegateFactory());
+ JMSMapMessage msg = new JMSMapMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
}
@@ -1030,7 +1036,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
public ObjectMessage createObjectMessage() throws JMSException
{
checkNotClosed();
- return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory());
+ JMSObjectMessage msg = new JMSObjectMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
public ObjectMessage createObjectMessage(Serializable object) throws JMSException
@@ -1240,7 +1248,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
- return new JMSStreamMessage(getMessageDelegateFactory());
+ JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
}
@@ -1319,7 +1329,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic
{
checkNotClosed();
- return new JMSTextMessage(getMessageDelegateFactory());
+ JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory());
+ msg.setAMQSession(this);
+ return msg;
}
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
index 55cf5fe64b..aeceec4f57 100644
--- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java
@@ -1190,7 +1190,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
}
}
- int resolveAddressType(AMQDestination dest) throws AMQException
+ public int resolveAddressType(AMQDestination dest) throws AMQException
{
int type = dest.getAddressType();
String name = dest.getAddressName();
@@ -1254,7 +1254,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
Collections.<String,Object>emptyMap()));
}
- private void setLegacyFiledsForQueueType(AMQDestination dest)
+ public void setLegacyFiledsForQueueType(AMQDestination dest)
{
// legacy support
dest.setQueueName(new AMQShortString(dest.getAddressName()));
@@ -1263,7 +1263,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic
dest.setRoutingKey(dest.getAMQQueueName());
}
- private void setLegacyFiledsForTopicType(AMQDestination dest)
+ public void setLegacyFiledsForTopicType(AMQDestination dest)
{
// legacy support
dest.setQueueName(null);
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
index 56f5a00e5c..6e5974b85c 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java
@@ -34,10 +34,13 @@ import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageFormatException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
+import org.apache.qpid.AMQException;
import org.apache.qpid.collections.ReferenceMap;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.AMQSession_0_10;
import org.apache.qpid.client.CustomJMSXProperty;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.jms.Message;
@@ -248,6 +251,29 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
final AMQDestination amqd = (AMQDestination) destination;
+ if (amqd.getDestSyntax() == AMQDestination.DestSyntax.ADDR)
+ {
+ try
+ {
+ int type = ((AMQSession_0_10)_session).resolveAddressType(amqd);
+ if (type == AMQDestination.QUEUE_TYPE)
+ {
+ ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd);
+ }
+ else
+ {
+ ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd);
+ }
+ }
+ catch(AMQException ex)
+ {
+ JMSException e = new JMSException("Error occured while figuring out the node type");
+ e.initCause(ex);
+ e.setLinkedException(ex);
+ throw e;
+ }
+ }
+
final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString());
_destinationCache.put(replyTo, destination);
_messageProps.setReplyTo(replyTo);
@@ -772,14 +798,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null)
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
if (_session.getAMQConnection().isClosed())
{
throw new javax.jms.IllegalStateException("Connection is already closed");
}
- // we set multiple to true here since acknowledgement implies acknowledge of all previous messages
+ // we set multiple to true here since acknowledgment implies acknowledge of all previous messages
// received on the session
_session.acknowledgeMessage(_deliveryTag, true);
}
@@ -787,7 +813,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate
public void acknowledge() throws JMSException
{
- if (_session != null)
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_session.acknowledge();
}
diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
index 96787e5a32..1d415b3c5f 100644
--- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
+++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java
@@ -30,6 +30,7 @@ import java.util.UUID;
import javax.jms.Destination;
import javax.jms.JMSException;
import javax.jms.MessageNotWriteableException;
+import javax.jms.Session;
import org.apache.qpid.client.AMQDestination;
import org.apache.qpid.client.AMQQueue;
@@ -523,7 +524,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
{
// the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
// is not specified. In our case, we only set the session field where client acknowledge mode is specified.
- if (_session != null)
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
if (_session.getAMQConnection().isClosed())
{
@@ -538,7 +539,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate
public void acknowledge() throws JMSException
{
- if (_session != null)
+ if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE)
{
_session.acknowledge();
}