diff options
author | Rajith Muditha Attapattu <rajith@apache.org> | 2010-07-22 17:27:24 +0000 |
---|---|---|
committer | Rajith Muditha Attapattu <rajith@apache.org> | 2010-07-22 17:27:24 +0000 |
commit | c05f958148125dddd1736e222883d15e384609b8 (patch) | |
tree | a9a52da407ce44c31905ed9e938dad2d6fe61c27 | |
parent | 9d84eb76dd4285c292deee61a691b4887870208d (diff) | |
download | qpid-python-c05f958148125dddd1736e222883d15e384609b8.tar.gz |
QPID-2752
Added a test case to create and LVQ from the JMS client using the addressing syntax.
Fixed a few bugs in QpidQueueOptions.java.
Modified the MapAccessor to allow any value to be retrieved as a String.
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@966763 13f79535-47bb-0310-9956-ffa450edef68
4 files changed, 84 insertions, 17 deletions
diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java index 12758c2d88..989e0c6fbd 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/AddressHelper.java @@ -66,6 +66,7 @@ public class AddressHelper public static final String QUEUE = "queue"; public static final String KEY = "key"; public static final String ARGUMENTS = "arguments"; + public static final String RELIABILITY = "reliability"; private Address address; private Accessor addressProps; @@ -142,13 +143,11 @@ public class AddressHelper if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE) != null) { - options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE)); - options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY)); + options.setOrderingPolicy(QpidQueueOptions.QPID_LAST_VALUE_QUEUE); } else if (args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE) != null) { - options.setOrderingPolicy(args.getString(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE)); - options.setLvqKey(args.getString(QpidQueueOptions.QPID_LVQ_KEY)); + options.setOrderingPolicy(QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE); } if (args.getString(QpidQueueOptions.QPID_QUEUE_EVENT_GENERATION) != null) diff --git a/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java b/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java index ac9bc59d79..04aa7d146f 100644 --- a/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java +++ b/java/client/src/main/java/org/apache/qpid/client/messaging/address/QpidQueueOptions.java @@ -67,18 +67,19 @@ public class QpidQueueOptions extends HashMap<String,Object> public void setOrderingPolicy(String s) { - if ("lvq".equals(s)) + if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE.equals(s)) { this.put(QPID_LAST_VALUE_QUEUE, 1); } - else if ("lvq_no_browse".equals(s)) + else if (QpidQueueOptions.QPID_LAST_VALUE_QUEUE_NO_BROWSE.equals(s)) { this.put(QPID_LAST_VALUE_QUEUE_NO_BROWSE,1); } else { throw new IllegalArgumentException("Invalid Ordering Policy" + - " should be one of {lvq|lvq_no_browse}"); + " should be one of {" + QpidQueueOptions.QPID_LAST_VALUE_QUEUE + "|" + + QPID_LAST_VALUE_QUEUE_NO_BROWSE + "}"); } } @@ -87,20 +88,16 @@ public class QpidQueueOptions extends HashMap<String,Object> this.put(QPID_LVQ_KEY, key); } - public void setQueueEvents(String s) + public void setQueueEvents(String value) { - if (s.equals("enque_only")) + if (value != null && (value.equals("1") || value.equals("2"))) { - this.put(QPID_QUEUE_EVENT_GENERATION, 1); - } - else if (s.equals("enque_and_dequeue")) - { - this.put(QPID_QUEUE_EVENT_GENERATION,2); + this.put(QPID_QUEUE_EVENT_GENERATION, value); } else { - throw new IllegalArgumentException("Invalid value" + - " should be one of {enqueue_only|enqueue_and_dequeue}"); + throw new IllegalArgumentException("Invalid value for " + + QPID_QUEUE_EVENT_GENERATION + " should be one of {1|2}"); } } } diff --git a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java index 090ecb8d7d..c9d386607d 100644 --- a/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java +++ b/java/common/src/main/java/org/apache/qpid/configuration/Accessor.java @@ -130,7 +130,14 @@ public interface Accessor { if (source != null && source.containsKey(name)) { - return (String)source.get(name); + if (source.get(name) instanceof String) + { + return (String)source.get(name); + } + else + { + return String.valueOf(source.get(name)); + } } else { diff --git a/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java b/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java new file mode 100644 index 0000000000..38906d0f53 --- /dev/null +++ b/java/systests/src/main/java/org/apache/qpid/test/client/queue/LVQTest.java @@ -0,0 +1,64 @@ +package org.apache.qpid.test.client.queue; + +import javax.jms.Connection; +import javax.jms.Destination; +import javax.jms.Message; +import javax.jms.MessageConsumer; +import javax.jms.MessageProducer; +import javax.jms.Session; +import javax.jms.TextMessage; + +import org.apache.qpid.test.client.destination.AddressBasedDestinationTest; +import org.apache.qpid.test.utils.QpidBrokerTestCase; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class LVQTest extends QpidBrokerTestCase +{ + private static final Logger _logger = LoggerFactory.getLogger(LVQTest.class); + private Connection _connection; + + @Override + public void setUp() throws Exception + { + super.setUp(); + _connection = getConnection() ; + _connection.start(); + } + + @Override + public void tearDown() throws Exception + { + _connection.close(); + super.tearDown(); + } + + public void testLVQQueue() throws Exception + { + String addr = "ADDR:my-lvq-queue; {create: always, " + + "node: {x-bindings: [{exchange : 'amq.direct', key : test}], " + + "x-declare:{'qpid.last_value_queue':1}}}"; + + Session ssn = _connection.createSession(false,Session.AUTO_ACKNOWLEDGE); + + Destination dest = ssn.createQueue(addr); + MessageConsumer consumer = ssn.createConsumer(dest); + MessageProducer prod = ssn.createProducer(ssn.createQueue("ADDR:amq.direct/test")); + + for (int i=0; i<40; i++) + { + Message msg = ssn.createTextMessage(String.valueOf(i)); + msg.setStringProperty("qpid.LVQ_key", String.valueOf(i%10)); + prod.send(msg); + } + + for (int i=0; i<10; i++) + { + TextMessage msg = (TextMessage)consumer.receive(500); + assertEquals("The last value is not reflected","3" + i,msg.getText()); + } + + assertNull("There should not be anymore messages",consumer.receive(500)); + } + +} |