diff options
| -rw-r--r-- | qpid/cpp/src/qpid/broker/Queue.cpp | 1 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.cpp | 8 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/broker/QueueSettings.h | 4 | ||||
| -rw-r--r-- | qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py | 80 |
4 files changed, 92 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/Queue.cpp b/qpid/cpp/src/qpid/broker/Queue.cpp index 5e806699de..4c9058e78b 100644 --- a/qpid/cpp/src/qpid/broker/Queue.cpp +++ b/qpid/cpp/src/qpid/broker/Queue.cpp @@ -755,6 +755,7 @@ void Queue::push(Message& message, bool /*isRecovery*/) { Mutex::ScopedLock locker(messageLock); message.setSequence(++sequence); + if (settings.sequencing) message.addAnnotation(settings.sequenceKey, (uint32_t)sequence); messages->publish(message); listeners.populate(copy); observeEnqueue(message, locker); diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.cpp b/qpid/cpp/src/qpid/broker/QueueSettings.cpp index 93b832733c..35aa46260c 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.cpp +++ b/qpid/cpp/src/qpid/broker/QueueSettings.cpp @@ -63,6 +63,7 @@ const std::string LVQ_LEGACY("qpid.last_value_queue"); const std::string LVQ_LEGACY_KEY("qpid.LVQ_key"); const std::string LVQ_LEGACY_NOBROWSE("qpid.last_value_queue_no_browse"); +const std::string SEQUENCING("qpid.queue_msg_sequence"); bool handleFairshareSetting(const std::string& basename, const std::string& key, const qpid::types::Variant& value, QueueSettings& settings) { @@ -97,7 +98,8 @@ QueueSettings::QueueSettings(bool d, bool a) : noLocal(false), isBrowseOnly(false), autoDeleteDelay(0), - alertRepeatInterval(60) + alertRepeatInterval(60), + sequencing(false) {} bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& value) @@ -203,6 +205,10 @@ bool QueueSettings::handle(const std::string& key, const qpid::types::Variant& v } else if (key == PAGE_FACTOR) { pageFactor = value; return true; + } else if (key == SEQUENCING) { + sequenceKey = value.getString(); + sequencing = !sequenceKey.empty(); + return true; } else if (key == FILTER) { filter = value.asString(); return true; diff --git a/qpid/cpp/src/qpid/broker/QueueSettings.h b/qpid/cpp/src/qpid/broker/QueueSettings.h index cf430db76d..19667e93ae 100644 --- a/qpid/cpp/src/qpid/broker/QueueSettings.h +++ b/qpid/cpp/src/qpid/broker/QueueSettings.h @@ -83,6 +83,10 @@ struct QueueSettings uint64_t maxFileSize; uint64_t maxFileCount; + std::string sequenceKey; + // store bool to avoid testing string value + bool sequencing; + std::string filter; //yuck, yuck diff --git a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py index 18a13e3ddf..c4a47029b9 100644 --- a/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py +++ b/qpid/tests/src/py/qpid_tests/broker_0_10/new_api.py @@ -123,3 +123,83 @@ class GeneralTests(Base): self.assertEqual(rx_alt.available(), 0, "No further messages should be received via the alternate exchange") sess4.close() + +class SequenceNumberTests(Base): + """ + Tests of ring queue sequence number + """ + + def fail(self, text=None): + if text: + print "Fail: %r" % text + assert None + + def setup_connection(self): + return Connection.establish(self.broker, **self.connection_options()) + + def setup_session(self): + return self.conn.session() + + def setup_sender(self, name="ring-sequence-queue", key="qpid.queue_msg_sequence"): + addr = "%s; {create:sender, delete:always, node: {x-declare: {arguments: {'qpid.queue_msg_sequence':'%s', 'qpid.policy_type':'ring', 'qpid.max_count':4}}}}" % (name, key) + sender = self.ssn.sender(addr) + return sender + + def test_create_sequence_queue(self): + """ + Test a queue with sequencing can be created + """ + + #setup, declare a queue + try: + sender = self.setup_sender() + except: + self.fail("Unable to create ring queue with sequencing enabled") + + def test_get_sequence_number(self): + """ + Test retrieving sequence number for queues + """ + + key = "k" + sender = self.setup_sender("ring-sequence-queue2", key=key) + + # send and receive 1 message and test the sequence number + msg = Message() + sender.send(msg) + + receiver = self.ssn.receiver("ring-sequence-queue2") + msg = receiver.fetch(1) + try: + seqNo = msg.properties[key] + if int(seqNo) != 1: + txt = "Unexpected sequence number. Should be 1. Received (%s)" % seqNo + self.fail(txt) + except: + txt = "Unable to get key (%s) from message properties" % key + self.fail(txt) + receiver.close() + + def test_sequence_number_gap(self): + """ + Test that sequence number for ring queues shows gaps when queue + messages are overwritten + """ + key = "qpid.seq" + sender = self.setup_sender("ring-sequence-queue3", key=key) + receiver = self.ssn.receiver("ring-sequence-queue3") + + msg = Message() + sender.send(msg) + msg = receiver.fetch(1) + + # send 5 more messages to overflow the queue + for i in range(5): + sender.send(msg) + + msg = receiver.fetch(1) + seqNo = msg.properties[key] + if int(seqNo) != 3: + txt = "Unexpected sequence number. Should be 3. Received (%s)" % seqNo + self.fail(txt) + receiver.close() |
