summaryrefslogtreecommitdiff
path: root/qpid/cpp/examples
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/examples')
-rw-r--r--qpid/cpp/examples/examples/fanout/listener.cpp73
-rw-r--r--qpid/cpp/examples/examples/pub-sub/topic_listener.cpp6
2 files changed, 43 insertions, 36 deletions
diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp
index 5295e10f34..f465a8554e 100644
--- a/qpid/cpp/examples/examples/fanout/listener.cpp
+++ b/qpid/cpp/examples/examples/fanout/listener.cpp
@@ -50,12 +50,12 @@ Listener::Listener(SubscriptionManager& subs) : subscriptions(subs)
{}
void Listener::received(Message& message) {
- std::cout << "Message: " << message.getData() << std::endl;
- if (message.getData() == "That's all, folks!") {
- std::cout << "Shutting down listener for " << message.getDestination()
- << std::endl;
- subscriptions.cancel(message.getDestination());
- }
+ std::cout << "Message: " << message.getData() << std::endl;
+ if (message.getData() == "That's all, folks!") {
+ std::cout << "Shutting down listener for " << message.getDestination()
+ << std::endl;
+ subscriptions.cancel(message.getDestination());
+ }
}
int main(int argc, char** argv) {
@@ -64,35 +64,38 @@ int main(int argc, char** argv) {
Connection connection;
Message msg;
try {
- connection.open(host, port);
- Session session = connection.newSession();
-
- //--------- Main body of program --------------------------------------------
-
- // Unique name for private queue:
- std::string myQueue=session.getId().str();
- // Declear my queue.
- session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
- arg::autoDelete=true);
- // Bind my queue to the fanout exchange.
- // Note no routingKey required, the fanout exchange delivers
- // all messages to all bound queues unconditionally.
- session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
-
- // Create a listener and subscribe it to my queue.
- SubscriptionManager subscriptions(session);
- Listener listener(subscriptions);
- subscriptions.subscribe(listener, myQueue);
-
- // Deliver messages until the subscription is cancelled
- // by Listener::received()
- std::cout << "Listening" << std::endl;
- subscriptions.run();
-
- //---------------------------------------------------------------------------
-
- connection.close();
- return 0;
+ connection.open(host, port);
+ Session session = connection.newSession();
+
+ //--------- Main body of program --------------------------------------------
+
+ // Unique name for private queue:
+ std::string myQueue=session.getId().str();
+ // Declear my queue.
+ session.queueDeclare(arg::queue=myQueue, arg::exclusive=true,
+ arg::autoDelete=true);
+ // Bind my queue to the fanout exchange.
+ // Note no routingKey required, the fanout exchange delivers
+ // all messages to all bound queues unconditionally.
+ session.queueBind(arg::exchange="amq.fanout", arg::queue=myQueue);
+
+ // Create a listener and subscribe it to my queue.
+ SubscriptionManager subscriptions(session);
+ Listener listener(subscriptions);
+ subscriptions.subscribe(listener, myQueue);
+
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
+ // Deliver messages until the subscription is cancelled
+ // by Listener::received()
+ std::cout << "Listening" << std::endl;
+ subscriptions.run();
+
+ //---------------------------------------------------------------------------
+
+ connection.close();
+ return 0;
} catch(const std::exception& error) {
std::cout << error.what() << std::endl;
}
diff --git a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
index 7364d89abb..7fd31f567e 100644
--- a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
+++ b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp
@@ -128,7 +128,8 @@ void Listener::received(Message& message) {
}
void Listener::listen() {
- subscriptions.run();
+ // Receive messages
+ subscriptions.run();
}
int main(int argc, char** argv) {
@@ -152,6 +153,9 @@ int main(int argc, char** argv) {
listener.prepareQueue("news", "#.news");
listener.prepareQueue("weather", "#.weather");
+ // Wait for the broker to indicate that our queues have been created.
+ session.sync();
+
std::cout << "Listening for messages ..." << std::endl;
// Give up control and receive messages