summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/broker/Queue.cpp1
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.cpp8
-rw-r--r--qpid/cpp/src/qpid/broker/QueueSettings.h4
-rw-r--r--qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py80
4 files changed, 92 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp
index 5e806699de..4c9058e78b 100644
--- a/qpid/cpp/src/qpid/broker/Queue.cpp
+++ b/qpid/cpp/src/qpid/broker/Queue.cpp
@@ -755,6 +755,7 @@ void Queue::push(Message& message, bool /*isRecovery*/)
{
Mutex::ScopedLock locker(messageLock);
message.setSequence(++sequence);
+ if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence);
messages->publish(message);
listeners.populate(copy);
observeEnqueue(message, locker);
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
index 93b832733c..35aa46260c 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp
@@ -63,6 +63,7 @@ const std::string LVQ_LEGACY("qpid.last_value_queue");
const std::string LVQ_LEGACY_KEY("qpid.LVQ_key");
const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse");
+const std::string SEQUENCING("qpid.queue_msg_sequence");
bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings)
{
@@ -97,7 +98,8 @@ QueueSettings::QueueSettings(bool d, bool a) :
noLocal(false),
isBrowseOnly(false),
autoDeleteDelay(0),
- alertRepeatInterval(60)
+ alertRepeatInterval(60),
+ sequencing(false)
{}
bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value)
@@ -203,6 +205,10 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v
} else if (key == PAGE_FACTOR) {
pageFactor = value;
return true;
+ } else if (key == SEQUENCING) {
+ sequenceKey = value.getString();
+ sequencing = !sequenceKey.empty();
+ return true;
} else if (key == FILTER) {
filter = value.asString();
return true;
diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h
index cf430db76d..19667e93ae 100644
--- a/qpid/cpp/src/qpid/broker/QueueSettings.h
+++ b/qpid/cpp/src/qpid/broker/QueueSettings.h
@@ -83,6 +83,10 @@ struct QueueSettings
uint64_t maxFileSize;
uint64_t maxFileCount;
+ std::string sequenceKey;
+ // store bool to avoid testing string value
+ bool sequencing;
+
std::string filter;
//yuck, yuck
diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
index 18a13e3ddf..c4a47029b9 100644
--- a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
+++ b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py
@@ -123,3 +123,83 @@ class GeneralTests(Base):
self.assertEqual(rx_alt.available(), 0, "No further messages should be received via the alternate exchange")
sess4.close()
+
+class SequenceNumberTests(Base):
+ """
+ Tests of ring queue sequence number
+ """
+
+ def fail(self, text=None):
+ if text:
+ print "Fail: %r" % text
+ assert None
+
+ def setup_connection(self):
+ return Connection.establish(self.broker, **self.connection_options())
+
+ def setup_session(self):
+ return self.conn.session()
+
+ def setup_sender(self, name="ring-sequence-queue", key="qpid.queue_msg_sequence"):
+ addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}" % (name, key)
+ sender = self.ssn.sender(addr)
+ return sender
+
+ def test_create_sequence_queue(self):
+ """
+ Test a queue with sequencing can be created
+ """
+
+ #setup, declare a queue
+ try:
+ sender = self.setup_sender()
+ except:
+ self.fail("Unable to create ring queue with sequencing enabled")
+
+ def test_get_sequence_number(self):
+ """
+ Test retrieving sequence number for queues
+ """
+
+ key = "k"
+ sender = self.setup_sender("ring-sequence-queue2", key=key)
+
+ # send and receive 1 message and test the sequence number
+ msg = Message()
+ sender.send(msg)
+
+ receiver = self.ssn.receiver("ring-sequence-queue2")
+ msg = receiver.fetch(1)
+ try:
+ seqNo = msg.properties[key]
+ if int(seqNo) != 1:
+ txt = "Unexpected sequence number. Should be 1. Received (%s)" % seqNo
+ self.fail(txt)
+ except:
+ txt = "Unable to get key (%s) from message properties" % key
+ self.fail(txt)
+ receiver.close()
+
+ def test_sequence_number_gap(self):
+ """
+ Test that sequence number for ring queues shows gaps when queue
+ messages are overwritten
+ """
+ key = "qpid.seq"
+ sender = self.setup_sender("ring-sequence-queue3", key=key)
+ receiver = self.ssn.receiver("ring-sequence-queue3")
+
+ msg = Message()
+ sender.send(msg)
+ msg = receiver.fetch(1)
+
+ # send 5 more messages to overflow the queue
+ for i in range(5):
+ sender.send(msg)
+
+ msg = receiver.fetch(1)
+ seqNo = msg.properties[key]
+ if int(seqNo) != 3:
+ txt = "Unexpected sequence number. Should be 3. Received (%s)" % seqNo
+ self.fail(txt)
+ receiver.close()