diff options
| author | Robert Godfrey <rgodfrey@apache.org> | 2013-09-30 07:29:51 +0000 |
|---|---|---|
| committer | Robert Godfrey <rgodfrey@apache.org> | 2013-09-30 07:29:51 +0000 |
| commit | c8d0fb167d8fc89fcb27823414454675b60a9dc1 (patch) | |
| tree | 587e486d46940256b1107cee59d3c61ecf9f3f29 /qpid/java | |
| parent | cce85c2b500a08719e551b756d41b34246535416 (diff) | |
| download | qpid-python-c8d0fb167d8fc89fcb27823414454675b60a9dc1.tar.gz | |
QPID-5177 : Set the default and supported outcomes on sending links in the amqp 1.0 JMS client
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk@1527467 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'qpid/java')
8 files changed, 58 insertions, 34 deletions
diff --git a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java index ada100f0a6..cc93725fb0 100644 --- a/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java +++ b/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java @@ -18,9 +18,8 @@ */ package org.apache.qpid.amqp_1_0.jms.impl; -import org.apache.qpid.amqp_1_0.client.ConnectionClosedException; -import org.apache.qpid.amqp_1_0.client.LinkDetachedException; -import org.apache.qpid.amqp_1_0.client.Sender; +import org.apache.qpid.amqp_1_0.client.*; +import org.apache.qpid.amqp_1_0.client.Session; import org.apache.qpid.amqp_1_0.jms.MessageProducer; import org.apache.qpid.amqp_1_0.jms.MessageRejectedException; import org.apache.qpid.amqp_1_0.jms.QueueSender; @@ -32,9 +31,13 @@ import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import javax.jms.*; import javax.jms.IllegalStateException; +import javax.jms.Message; import java.util.UUID; import org.apache.qpid.amqp_1_0.type.messaging.Accepted; import org.apache.qpid.amqp_1_0.type.messaging.Rejected; +import org.apache.qpid.amqp_1_0.type.messaging.Source; +import org.apache.qpid.amqp_1_0.type.messaging.codec.AcceptedConstructor; +import org.apache.qpid.amqp_1_0.type.messaging.codec.RejectedConstructor; import org.apache.qpid.amqp_1_0.type.transport.Error; public class MessageProducerImpl implements MessageProducer, QueueSender, TopicPublisher @@ -71,7 +74,14 @@ public class MessageProducerImpl implements MessageProducer, QueueSender, TopicP { try { - _sender = _session.getClientSession().createSender(_session.toAddress(_destination)); + _sender = _session.getClientSession().createSender(_session.toAddress(_destination), new Session.SourceConfigurator() + { + public void configureSource(final Source source) + { + source.setDefaultOutcome(new Accepted()); + source.setOutcomes(AcceptedConstructor.SYMBOL_CONSTRUCTOR, RejectedConstructor.SYMBOL_CONSTRUCTOR); + } + }); } catch (Sender.SenderCreationException e) { diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java index 26b0d80783..09d19f4394 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java @@ -154,8 +154,7 @@ public class Demo extends Util responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - Sender s = session.createSender(queue, getWindowSize(), getMode()); + Sender s = session.createSender(queue, getWindowSize(), getMode(), null); Properties properties = new Properties(); diff --git a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java index 7845e318cb..bce7bfcd9a 100644 --- a/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java +++ b/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java @@ -21,7 +21,6 @@ package org.apache.qpid.amqp_1_0.client; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; import org.apache.qpid.amqp_1_0.type.Section; import org.apache.qpid.amqp_1_0.type.UnsignedInteger; import org.apache.qpid.amqp_1_0.type.UnsignedLong; @@ -143,8 +142,7 @@ public class Request extends Util responseReceiver.setCredit(UnsignedInteger.valueOf(getWindowSize()), true); - - Sender s = session.createSender(queue, getWindowSize(), getMode()); + Sender s = session.createSender(queue, getWindowSize(), getMode(), null); Transaction txn = null; diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java index 6727459b56..e35248f58c 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java @@ -107,6 +107,16 @@ public class Sender implements DeliveryStateHandler this(session, linkName, createTarget(targetAddr, isDurable), createSource(sourceAddr), window, mode, unsettled); } + protected void configureSource(org.apache.qpid.amqp_1_0.type.messaging.Source source) + { + + } + + protected void configureTarget(org.apache.qpid.amqp_1_0.type.messaging.Target target) + { + + } + private static org.apache.qpid.amqp_1_0.type.messaging.Source createSource(final String sourceAddr) { org.apache.qpid.amqp_1_0.type.messaging.Source source = new org.apache.qpid.amqp_1_0.type.messaging.Source(); @@ -133,6 +143,8 @@ public class Sender implements DeliveryStateHandler _session = session; session.getConnection().checkNotClosed(); + configureSource(source); + configureTarget(target); _endpoint = session.getEndpoint().createSendingLinkEndpoint(linkName, source, target, unsettled); @@ -189,16 +201,15 @@ public class Sender implements DeliveryStateHandler super.remoteDetached(endpoint, detach); } }); - final org.apache.qpid.amqp_1_0.type.messaging.Source remoteSource = - (org.apache.qpid.amqp_1_0.type.messaging.Source) getSource(); - _defaultOutcome = remoteSource.getDefaultOutcome(); + + _defaultOutcome = source.getDefaultOutcome(); if(_defaultOutcome == null) { - if(remoteSource.getOutcomes() == null || remoteSource.getOutcomes().length == 0) + if(source.getOutcomes() == null || source.getOutcomes().length == 0) { _defaultOutcome = new Accepted(); } - else if(remoteSource.getOutcomes().length == 1) + else if(source.getOutcomes().length == 1) { final AMQPDescribedTypeRegistry describedTypeRegistry = _endpoint.getSession() @@ -206,7 +217,7 @@ public class Sender implements DeliveryStateHandler .getDescribedTypeRegistry(); DescribedTypeConstructor constructor = describedTypeRegistry - .getConstructor(remoteSource.getOutcomes()[0]); + .getConstructor(source.getOutcomes()[0]); if(constructor != null) { Object impliedOutcome = constructor.construct(Collections.EMPTY_LIST); diff --git a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java index 79ed3b4457..626ea0f34d 100644 --- a/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java +++ b/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Session.java @@ -63,15 +63,26 @@ public class Session public synchronized Sender createSender(final String targetName) throws Sender.SenderCreationException, ConnectionClosedException { - return createSender(targetName, false); + + final String sourceName = UUID.randomUUID().toString(); + return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false); + } - public synchronized Sender createSender(final String targetName, boolean synchronous) + + public synchronized Sender createSender(final String targetName, final SourceConfigurator configurator) throws Sender.SenderCreationException, ConnectionClosedException { final String sourceName = UUID.randomUUID().toString(); - return new Sender(this, targetName+"<-"+sourceName, targetName, sourceName, synchronous); + return new Sender(this, targetName +"<-"+sourceName, targetName, sourceName, false) + { + @Override + protected void configureSource(final Source source) + { + configurator.configureSource(source); + } + }; } @@ -83,22 +94,10 @@ public class Session } - public Sender createSender(String targetName, int window, AcknowledgeMode mode) - throws Sender.SenderCreationException, ConnectionClosedException - { - - return createSender(targetName, window, mode, null); - } - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName) throws Sender.SenderCreationException, ConnectionClosedException { - return createSender(targetName, window, mode, linkName, null); - } - public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, Map<Binary, Outcome> unsettled) - throws Sender.SenderCreationException, ConnectionClosedException - { - return createSender(targetName, window, mode, linkName, false, unsettled); + return createSender(targetName, window, mode, linkName, false, null); } public Sender createSender(String targetName, int window, AcknowledgeMode mode, String linkName, @@ -381,4 +380,9 @@ public class Session } } } + + public static interface SourceConfigurator + { + public void configureSource(final Source source); + } } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java index b634542fd6..e24fb1af7d 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/Source.java @@ -152,7 +152,7 @@ public class Source return _outcomes; } - public void setOutcomes(Symbol[] outcomes) + public void setOutcomes(Symbol... outcomes) { _outcomes = outcomes; } diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java index 8000853a4d..438378599e 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/AcceptedConstructor.java @@ -34,9 +34,10 @@ import java.util.List; public class AcceptedConstructor extends DescribedTypeConstructor<Accepted> { + public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:accepted:list"); private static final Object[] DESCRIPTORS = { - Symbol.valueOf("amqp:accepted:list"),UnsignedLong.valueOf(0x0000000000000024L), + SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000024L), }; private static final AcceptedConstructor INSTANCE = new AcceptedConstructor(); diff --git a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java index eb923c535d..d7713bbb52 100644 --- a/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java +++ b/qpid/java/amqp-1-0-common/src/main/java/org/apache/qpid/amqp_1_0/type/messaging/codec/RejectedConstructor.java @@ -34,9 +34,10 @@ import java.util.List; public class RejectedConstructor extends DescribedTypeConstructor<Rejected> { + public static final Symbol SYMBOL_CONSTRUCTOR = Symbol.valueOf("amqp:rejected:list"); private static final Object[] DESCRIPTORS = { - Symbol.valueOf("amqp:rejected:list"),UnsignedLong.valueOf(0x0000000000000025L), + SYMBOL_CONSTRUCTOR,UnsignedLong.valueOf(0x0000000000000025L), }; private static final RejectedConstructor INSTANCE = new RejectedConstructor(); |
