From c748d3affc1dff412bfa17ee94d5f9f41dee6579 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Mon, 9 Jun 2008 20:55:09 +0000 Subject: Updated doxygen comments in qpid/client/*.h Changed request-response example to use SubscriptionManager like the others. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk@665891 13f79535-47bb-0310-9956-ffa450edef68 --- qpid/cpp/examples/examples/direct/listener.cpp | 1 - qpid/cpp/examples/examples/fanout/listener.cpp | 1 - .../examples/examples/pub-sub/topic_listener.cpp | 1 - .../examples/examples/request-response/client.cpp | 101 +++++++------------ .../examples/examples/request-response/server.cpp | 108 ++++++++------------- .../examples/examples/xml-exchange/listener.cpp | 1 - 6 files changed, 77 insertions(+), 136 deletions(-) (limited to 'qpid/cpp/examples') diff --git a/qpid/cpp/examples/examples/direct/listener.cpp b/qpid/cpp/examples/examples/direct/listener.cpp index fc2fa96ead..b1aa9754bc 100644 --- a/qpid/cpp/examples/examples/direct/listener.cpp +++ b/qpid/cpp/examples/examples/direct/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include #include #include #include diff --git a/qpid/cpp/examples/examples/fanout/listener.cpp b/qpid/cpp/examples/examples/fanout/listener.cpp index d9bf9789dc..294dfc7855 100644 --- a/qpid/cpp/examples/examples/fanout/listener.cpp +++ b/qpid/cpp/examples/examples/fanout/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include #include #include #include diff --git a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp index 4d854e57ff..9996abab19 100644 --- a/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp +++ b/qpid/cpp/examples/examples/pub-sub/topic_listener.cpp @@ -44,7 +44,6 @@ #include #include #include -#include #include #include diff --git a/qpid/cpp/examples/examples/request-response/client.cpp b/qpid/cpp/examples/examples/request-response/client.cpp index 79bc88c6ae..0ee0e78c92 100644 --- a/qpid/cpp/examples/examples/request-response/client.cpp +++ b/qpid/cpp/examples/examples/request-response/client.cpp @@ -39,7 +39,7 @@ #include -#include +#include #include #include #include @@ -54,53 +54,25 @@ using namespace qpid::client; using namespace qpid::framing; class Listener : public MessageListener{ -private: - Session session; - std::string destination_name; - Dispatcher dispatcher; - int counter; -public: - Listener(Session& session, string destination_name): - session(session), - destination_name(destination_name), - dispatcher(session), - counter(0) - {}; - - virtual void listen(); - virtual void wait(); - virtual void received(Message& message); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + int counter; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating response queue listener for: " < 3) { - std::cout << "Shutting down listener for " << destination_name << std::endl; - dispatcher.stop(); - } + ++ counter; + if (counter > 3) { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } } @@ -116,7 +88,7 @@ int main(int argc, char** argv) { connection.open(host, port); Session session = connection.newSession(); - //--------- Main body of program -------------------------------------------- + //--------- Main body of program -------------------------------------------- // Create a response queue so the server can send us responses // to our requests. Use the client's session ID as the name @@ -130,45 +102,40 @@ int main(int argc, char** argv) { session.queueDeclare(arg::queue=response_queue.str()); session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str()); - // Create a listener for the response queue and start listening. - - Listener listener(session, response_queue.str()); - listener.listen(); - - - // The routing key for the request queue is simply - // "request", and all clients use the same routing key. - // // Each client sends the name of their own response queue so // the service knows where to route messages. request.getDeliveryProperties().setRoutingKey("request"); request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str())); + // Create a listener for the response queue and listen for response messages. + std::cout << "Activating response queue listener for: " << response_queue.str() << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, response_queue.str()); + // Now send some requests ... string s[] = { - "Twas brillig, and the slithy toves", - "Did gire and gymble in the wabe.", - "All mimsy were the borogroves,", - "And the mome raths outgrabe." + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." }; for (int i=0; i<4; i++) { - request.setData(s[i]); - // Asynchronous transfer sends messages as quickly as - // possible without waiting for confirmation. - async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); - std::cout << "Request: " << s[i] << std::endl; + request.setData(s[i]); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=request, arg::destination="amq.direct"); + std::cout << "Request: " << s[i] << std::endl; } - // And wait for any outstanding responses to arrive - - listener.wait(); - + std::cout << "Waiting for all responses to arrive ..." << std::endl; + subscriptions.run(); - //----------------------------------------------------------------------------- + //----------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/request-response/server.cpp b/qpid/cpp/examples/examples/request-response/server.cpp index 83144c715d..df189cfdd8 100644 --- a/qpid/cpp/examples/examples/request-response/server.cpp +++ b/qpid/cpp/examples/examples/request-response/server.cpp @@ -39,8 +39,9 @@ #include -#include +#include #include +#include #include #include @@ -59,102 +60,79 @@ using std::stringstream; using std::string; class Listener : public MessageListener{ -private: - std::string destination_name; - Dispatcher dispatcher; - Session session; -public: - Listener(Session& session, string destination_name): - destination_name(destination_name), - dispatcher(session), - session(session) - {}; - - virtual void listen(); - virtual void received(Message& message); - virtual void wait(); - ~Listener() { }; + private: + SubscriptionManager& subscriptions; + AsyncSession asyncSession; + public: + Listener(SubscriptionManager& subscriptions, Session& session); + virtual void received(Message& message); }; - -void Listener::listen() { - std::cout << "Activating request queue listener for: " <1 ? argv[1] : "127.0.0.1"; int port = argc>2 ? atoi(argv[2]) : 5672; - Connection connection; + Connection connection; Message message; try { connection.open(host, port); Session session = connection.newSession(); - //--------- Main body of program -------------------------------------------- + //--------- Main body of program -------------------------------------------- + // Create a request queue for clients to use when making // requests. - string request_queue = "request"; // Use the name of the request queue as the routing key - session.queueDeclare(arg::queue=request_queue); session.exchangeBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::bindingKey=request_queue); - // Create a listener for the request queue and start listening. - - Listener listener(session, request_queue); - listener.listen(); - listener.wait(); + // Create a listener and subscribe it to the request_queue + std::cout << "Activating request queue listener for: " << request_queue << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions, session); + subscriptions.subscribe(listener, request_queue); + // Deliver messages until the subscription is cancelled + // by Listener::received() + std::cout << "Waiting for requests" << std::endl; + subscriptions.run(); - //----------------------------------------------------------------------------- + //----------------------------------------------------------------------------- connection.close(); return 0; diff --git a/qpid/cpp/examples/examples/xml-exchange/listener.cpp b/qpid/cpp/examples/examples/xml-exchange/listener.cpp index 559cfaf8c9..f4305b5c5a 100644 --- a/qpid/cpp/examples/examples/xml-exchange/listener.cpp +++ b/qpid/cpp/examples/examples/xml-exchange/listener.cpp @@ -24,7 +24,6 @@ * the broker using a message listener. */ -#include #include #include #include -- cgit v1.2.1