diff options
Diffstat (limited to 'qpid/cpp/examples')
| -rw-r--r-- | qpid/cpp/examples/examples/fanout/listener.cpp | 73 | ||||
| -rw-r--r-- | qpid/cpp/examples/examples/pub-sub/topic_listener.cpp | 6 |
2 files changed, 43 insertions, 36 deletions
diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index 5295e10f34..f465a8554e 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) {} void Listener::received(Message& message) { - std::cout << "Message: " << message.getData() << std::endl; - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " << message.getDestination() - << std::endl; - subscriptions.cancel(message.getDestination()); - } + std::cout << "Message: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } } int main(int argc, char** argv) { @@ -64,35 +64,38 @@ int main(int argc, char** argv) { Connection connection; Message msg; try { - connection.open(host, port); - Session session = connection.newSession(); - - //--------- Main body of program -------------------------------------------- - - // Unique name for private queue: - std::string myQueue=session.getId().str(); - // Declear my queue. - session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, - arg::autoDelete=true); - // Bind my queue to the fanout exchange. - // Note no routingKey required, the fanout exchange delivers - // all messages to all bound queues unconditionally. - session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); - - // Create a listener and subscribe it to my queue. - SubscriptionManager subscriptions(session); - Listener listener(subscriptions); - subscriptions.subscribe(listener, myQueue); - - // Deliver messages until the subscription is cancelled - // by Listener::received() - std::cout << "Listening" << std::endl; - subscriptions.run(); - - //--------------------------------------------------------------------------- - - connection.close(); - return 0; + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Unique name for private queue: + std::string myQueue=session.getId().str(); + // Declear my queue. + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + // Bind my queue to the fanout exchange. + // Note no routingKey required, the fanout exchange delivers + // all messages to all bound queues unconditionally. + session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue); + + // Create a listener and subscribe it to my queue. + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, myQueue); + + // Wait for the broker to indicate that our queues have been created. + session.sync(); + + // Deliver messages until the subscription is cancelled + // by Listener::received() + std::cout << "Listening" << std::endl; + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; } catch(const std::exception& error) { std::cout << error.what() << std::endl; } diff --git a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp index 7364d89abb..7fd31f567e 100644 --- a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -128,7 +128,8 @@ void Listener::received(Message& message) { } void Listener::listen() { - subscriptions.run(); + // Receive messages + subscriptions.run(); } int main(int argc, char** argv) { @@ -152,6 +153,9 @@ int main(int argc, char** argv) { listener.prepareQueue("news", "#.news"); listener.prepareQueue("weather", "#.weather"); + // Wait for the broker to indicate that our queues have been created. + session.sync(); + std::cout << "Listening for messages ..." << std::endl; // Give up control and receive messages |
