summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/tests/receiver.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/tests/receiver.cpp')
-rw-r--r--qpid/cpp/src/tests/receiver.cpp23
1 files changed, 16 insertions, 7 deletions
diff --git a/qpid/cpp/src/tests/receiver.cpp b/qpid/cpp/src/tests/receiver.cpp
index 3a4ac3649d..1b0b6b2548 100644
--- a/qpid/cpp/src/tests/receiver.cpp
+++ b/qpid/cpp/src/tests/receiver.cpp
@@ -41,13 +41,17 @@ struct Args : public qpid::TestOptions
string queue;
uint messages;
bool ignoreDuplicates;
+ uint creditWindow;
+ uint ackFrequency;
- Args() : queue("test-queue"), messages(0), ignoreDuplicates(false)
+ Args() : queue("test-queue"), messages(0), ignoreDuplicates(false), creditWindow(0), ackFrequency(1)
{
addOptions()
("queue", qpid::optValue(queue, "QUEUE NAME"), "Queue from which to request messages")
("messages", qpid::optValue(messages, "N"), "Number of messages to receive; 0 means receive indefinitely")
- ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)");
+ ("ignore-duplicates", qpid::optValue(ignoreDuplicates), "Detect and ignore duplicates (by checking 'sn' header)")
+ ("credit-window", qpid::optValue(creditWindow, "N"), "Credit window (0 implies infinite window)")
+ ("ack-frequency", qpid::optValue(ackFrequency, "N"), "Ack frequency (0 implies none of the messages will get accepted)");
}
};
@@ -56,13 +60,14 @@ const string EOS("eos");
class Receiver : public MessageListener, public FailoverManager::Command
{
public:
- Receiver(const string& queue, uint messages, bool ignoreDuplicates);
+ Receiver(const string& queue, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency);
void received(Message& message);
void execute(AsyncSession& session, bool isRetry);
private:
const string queue;
const uint count;
const bool skipDups;
+ SubscriptionSettings settings;
Subscription subscription;
uint processed;
uint lastSn;
@@ -70,8 +75,12 @@ class Receiver : public MessageListener, public FailoverManager::Command
bool isDuplicate(Message& message);
};
-Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates) :
- queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0) {}
+Receiver::Receiver(const string& q, uint messages, bool ignoreDuplicates, uint creditWindow, uint ackFrequency) :
+ queue(q), count(messages), skipDups(ignoreDuplicates), processed(0), lastSn(0)
+{
+ if (creditWindow) settings.flowControl = FlowControl::messageWindow(creditWindow);
+ settings.autoAck = ackFrequency;
+}
void Receiver::received(Message & message)
{
@@ -96,7 +105,7 @@ bool Receiver::isDuplicate(Message& message)
void Receiver::execute(AsyncSession& session, bool /*isRetry*/)
{
SubscriptionManager subs(session);
- subscription = subs.subscribe(*this, queue);
+ subscription = subs.subscribe(*this, queue, settings);
subs.run();
}
@@ -106,7 +115,7 @@ int main(int argc, char ** argv)
try {
opts.parse(argc, argv);
FailoverManager connection(opts.con);
- Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates);
+ Receiver receiver(opts.queue, opts.messages, opts.ignoreDuplicates, opts.creditWindow, opts.ackFrequency);
connection.execute(receiver);
connection.close();
return 0;