diff options
| author | Alan Conway <aconway@apache.org> | 2008-02-18 21:54:02 +0000 |
|---|---|---|
| committer | Alan Conway <aconway@apache.org> | 2008-02-18 21:54:02 +0000 |
| commit | c612a6c6200fd9a8f9830cbad062b30b465d3dfe (patch) | |
| tree | fd427336d51e5c091529eb53743592294f7748be /cpp/examples | |
| parent | 3966d8be198296525a87a6bd88a42c4bb4f20d03 (diff) | |
| download | qpid-python-c612a6c6200fd9a8f9830cbad062b30b465d3dfe.tar.gz | |
Fixed race condition in the examples: when a listener program prints
its "ready" message, the commands it has sent to the broker may not yet
be complete. This results in sporadic lost messages if the producer is
started immediately (e.g. by a script.)
- Added Session::sync(), wait till all commands to date have completed.
- Call sync() before printing "ready" in listener example programs
- Removed sleep from verify script
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@628875 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/examples')
| -rw-r--r-- | cpp/examples/examples/fanout/listener.cpp | 73 | ||||
| -rw-r--r-- | cpp/examples/examples/pub-sub/topic_listener.cpp | 6 |
2 files changed, 43 insertions, 36 deletions
diff --git a/cpp/examples/examples/fanout/listener.cpp b/cpp/examples/examples/fanout/listener.cpp index 5295e10f34..f465a8554e 100644 --- a/cpp/examples/examples/fanout/listener.cpp +++ b/cpp/examples/examples/fanout/listener.cpp @@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) {} void Listener::received(Message& message) { - std::cout << "Message: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " << message.getDestination() - << std::endl; - subscriptions.cancel(message.getDestination()); - } + std::cout << "Message: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } } int main(int argc, char** argv) { @@ -64,35 +64,38 @@ int main(int argc, char** argv) { Connection connection; Message msg; try { - connection.open(host, port); - Session session = connection.newSession(); - - //--------- Main body of program -------------------------------------------- - - // Unique name for private queue: - std::string myQueue=session.getId().str(); - // Declear my queue. - session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, - arg::autoDelete=true); - // Bind my queue to the fanout exchange. - // Note no routingKey required, the fanout exchange delivers - // all messages to all bound queues unconditionally. - session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); - - // Create a listener and subscribe it to my queue. - SubscriptionManager subscriptions(session); - Listener listener(subscriptions); - subscriptions.subscribe(listener, myQueue); - - // Deliver messages until the subscription is cancelled - // by Listener::received() - std::cout << "Listening" << std::endl; - subscriptions.run(); - - //--------------------------------------------------------------------------- - - connection.close(); - return 0; + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Unique name for private queue: + std::string myQueue=session.getId().str(); + // Declear my queue. + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + // Bind my queue to the fanout exchange. + // Note no routingKey required, the fanout exchange delivers + // all messages to all bound queues unconditionally. + session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + + // Create a listener and subscribe it to my queue. + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, myQueue); + + // Wait for the broker to indicate that our queues have been created. + session.sync(); + + // Deliver messages until the subscription is cancelled + // by Listener::received() + std::cout << "Listening" << std::endl; + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } diff --git a/cpp/examples/examples/pub-sub/topic_listener.cpp b/cpp/examples/examples/pub-sub/topic_listener.cpp index 7364d89abb..7fd31f567e 100644 --- a/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -128,7 +128,8 @@ void Listener::received(Message& message) { } void Listener::listen() { - subscriptions.run(); + // Receive messages + subscriptions.run(); } int main(int argc, char** argv) { @@ -152,6 +153,9 @@ int main(int argc, char** argv) { listener.prepareQueue("news", "#.news"); listener.prepareQueue("weather", "#.weather"); + // Wait for the broker to indicate that our queues have been created. + session.sync(); + std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages |
