diff options
Diffstat (limited to 'qpid/cpp/src/tests/receiver.cpp')
| -rw-r--r-- | qpid/cpp/src/tests/receiver.cpp | 23 |
1 files changed, 16 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/receiver.cpp b/qpid/cpp/src/tests/receiver.cpp index 3a4ac3649d..1b0b6b2548 100644 --- a/qpid/cpp/src/tests/receiver.cpp +++ b/qpid/cpp/src/tests/receiver.cpp @@ -41,13 +41,17 @@ struct Args : public qpid::TestOptions string queue; uint messages; bool ignoreDuplicates; + uint creditWindow; + uint ackFrequency; - Args() : queue("test-queue"), messages(0), ignoreDuplicates(false) + Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1) { addOptions() ("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages") ("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely") - ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)"); + ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)") + ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)") + ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)"); } }; @@ -56,13 +60,14 @@ const string EOS("eos"); class Receiver : public MessageListener, public FailoverManager::Command { public: - Receiver(const string& queue, uint messages, bool ignoreDuplicates); + Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency); void received(Message& message); void execute(AsyncSession& session, bool isRetry); private: const string queue; const uint count; const bool skipDups; + SubscriptionSettings settings; Subscription subscription; uint processed; uint lastSn; @@ -70,8 +75,12 @@ class Receiver : public MessageListener, public FailoverManager::Command bool isDuplicate(Message& message); }; -Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) : - queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {} +Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) : + queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) +{ + if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow); + settings.autoAck = ackFrequency; +} void Receiver::received(Message & message) { @@ -96,7 +105,7 @@ bool Receiver::isDuplicate(Message& message) void Receiver::execute(AsyncSession& session, bool /*isRetry*/) { SubscriptionManager subs(session); - subscription = subs.subscribe(*this, queue); + subscription = subs.subscribe(*this, queue, settings); subs.run(); } @@ -106,7 +115,7 @@ int main(int argc, char ** argv) try { opts.parse(argc, argv); FailoverManager connection(opts.con); - Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates); + Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency); connection.execute(receiver); connection.close(); return 0; |
