diff options
| author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-07-08 18:27:01 +0000 |
|---|---|---|
| committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-07-08 18:27:01 +0000 |
| commit | 555b22988d27b1b6fb66280905d7c71becb59c43 (patch) | |
| tree | 6e45b93be4c279a8aa7519aabae1eb38571e5a77 /java/client/src | |
| parent | d4a39c33f6f0e3d88a385cdbf0eb942a60ceb781 (diff) | |
| download | qpid-python-555b22988d27b1b6fb66280905d7c71becb59c43.tar.gz | |
This is a fix for QPID-2723
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@961866 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'java/client/src')
4 files changed, 53 insertions, 14 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java index 3390dfef8b..10ee8a8a0d 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession.java @@ -549,7 +549,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public BytesMessage createBytesMessage() throws JMSException { checkNotClosed(); - return new JMSBytesMessage(getMessageDelegateFactory()); + JMSBytesMessage msg = new JMSBytesMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } /** @@ -1014,11 +1016,15 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic checkNotClosed(); if (USE_AMQP_ENCODED_MAP_MESSAGE) { - return new AMQPEncodedMapMessage(getMessageDelegateFactory()); + AMQPEncodedMapMessage msg = new AMQPEncodedMapMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } else { - return new JMSMapMessage(getMessageDelegateFactory()); + JMSMapMessage msg = new JMSMapMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } } @@ -1030,7 +1036,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic public ObjectMessage createObjectMessage() throws JMSException { checkNotClosed(); - return (ObjectMessage) new JMSObjectMessage(getMessageDelegateFactory()); + JMSObjectMessage msg = new JMSObjectMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } public ObjectMessage createObjectMessage(Serializable object) throws JMSException @@ -1240,7 +1248,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); - return new JMSStreamMessage(getMessageDelegateFactory()); + JMSStreamMessage msg = new JMSStreamMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } } @@ -1319,7 +1329,9 @@ public abstract class AMQSession<C extends BasicMessageConsumer, P extends Basic { checkNotClosed(); - return new JMSTextMessage(getMessageDelegateFactory()); + JMSTextMessage msg = new JMSTextMessage(getMessageDelegateFactory()); + msg.setAMQSession(this); + return msg; } } diff --git a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java index 55cf5fe64b..aeceec4f57 100644 --- a/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_10.java @@ -1190,7 +1190,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic } } - int resolveAddressType(AMQDestination dest) throws AMQException + public int resolveAddressType(AMQDestination dest) throws AMQException { int type = dest.getAddressType(); String name = dest.getAddressName(); @@ -1254,7 +1254,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic Collections.<String,Object>emptyMap())); } - private void setLegacyFiledsForQueueType(AMQDestination dest) + public void setLegacyFiledsForQueueType(AMQDestination dest) { // legacy support dest.setQueueName(new AMQShortString(dest.getAddressName())); @@ -1263,7 +1263,7 @@ public class AMQSession_0_10 extends AMQSession<BasicMessageConsumer_0_10, Basic dest.setRoutingKey(dest.getAMQQueueName()); } - private void setLegacyFiledsForTopicType(AMQDestination dest) + public void setLegacyFiledsForTopicType(AMQDestination dest) { // legacy support dest.setQueueName(null); diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java index 56f5a00e5c..6e5974b85c 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_10.java @@ -34,10 +34,13 @@ import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageFormatException; import javax.jms.MessageNotWriteableException; +import javax.jms.Session; +import org.apache.qpid.AMQException; import org.apache.qpid.collections.ReferenceMap; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQSession; +import org.apache.qpid.client.AMQSession_0_10; import org.apache.qpid.client.CustomJMSXProperty; import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.jms.Message; @@ -248,6 +251,29 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate final AMQDestination amqd = (AMQDestination) destination; + if (amqd.getDestSyntax() == AMQDestination.DestSyntax.ADDR) + { + try + { + int type = ((AMQSession_0_10)_session).resolveAddressType(amqd); + if (type == AMQDestination.QUEUE_TYPE) + { + ((AMQSession_0_10)_session).setLegacyFiledsForQueueType(amqd); + } + else + { + ((AMQSession_0_10)_session).setLegacyFiledsForTopicType(amqd); + } + } + catch(AMQException ex) + { + JMSException e = new JMSException("Error occured while figuring out the node type"); + e.initCause(ex); + e.setLinkedException(ex); + throw e; + } + } + final ReplyTo replyTo = new ReplyTo(amqd.getExchangeName().toString(), amqd.getRoutingKey().toString()); _destinationCache.put(replyTo, destination); _messageProps.setReplyTo(replyTo); @@ -772,14 +798,14 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - if (_session != null) + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { if (_session.getAMQConnection().isClosed()) { throw new javax.jms.IllegalStateException("Connection is already closed"); } - // we set multiple to true here since acknowledgement implies acknowledge of all previous messages + // we set multiple to true here since acknowledgment implies acknowledge of all previous messages // received on the session _session.acknowledgeMessage(_deliveryTag, true); } @@ -787,7 +813,7 @@ public class AMQMessageDelegate_0_10 extends AbstractAMQMessageDelegate public void acknowledge() throws JMSException { - if (_session != null) + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _session.acknowledge(); } diff --git a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java index 96787e5a32..1d415b3c5f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java +++ b/java/client/src/main/java/org/apache/qpid/client/message/AMQMessageDelegate_0_8.java @@ -30,6 +30,7 @@ import java.util.UUID; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.MessageNotWriteableException; +import javax.jms.Session; import org.apache.qpid.client.AMQDestination; import org.apache.qpid.client.AMQQueue; @@ -523,7 +524,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate { // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge // is not specified. In our case, we only set the session field where client acknowledge mode is specified. - if (_session != null) + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { if (_session.getAMQConnection().isClosed()) { @@ -538,7 +539,7 @@ public class AMQMessageDelegate_0_8 extends AbstractAMQMessageDelegate public void acknowledge() throws JMSException { - if (_session != null) + if (_session != null && _session.getAcknowledgeMode() == Session.CLIENT_ACKNOWLEDGE) { _session.acknowledge(); } |
