From efb5b0171cd6049db8fdd2aa3f9546c65be19852 Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Fri, 9 Nov 2007 02:38:33 +0000 Subject: QPID-676: Jonathan Robie's C++ examples. Made the following alterations for recent C++ API changes: - use arg:: namespace for Session keyword arguments. - removed trailing _ on session method names. cpp/examples/Makefile.am calls make in each example directory with flags to build examples from headers/libraries SVN checkout. Examples themselves have a plain Makefile (not automake) which will work as is if qpid is installed in standard places. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@593402 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/examples/direct/Makefile | 20 +++ cpp/examples/direct/direct_config_queues.cpp | 86 ++++++++++ .../direct/direct_persistent_config_queues.cpp | 101 ++++++++++++ .../direct/direct_persistent_publisher.cpp | 111 +++++++++++++ cpp/examples/direct/direct_publisher.cpp | 104 ++++++++++++ cpp/examples/direct/listener.cpp | 134 ++++++++++++++++ cpp/examples/fanout/Makefile | 17 ++ cpp/examples/fanout/fanout_config_queues.cpp | 86 ++++++++++ cpp/examples/fanout/fanout_consumer.cpp | 83 ++++++++++ cpp/examples/fanout/fanout_publisher.cpp | 102 ++++++++++++ cpp/examples/fanout/listener.cpp | 134 ++++++++++++++++ cpp/examples/pub-sub/Makefile | 17 ++ cpp/examples/pub-sub/topic_config_queues.cpp | 128 +++++++++++++++ cpp/examples/pub-sub/topic_listener.cpp | 169 +++++++++++++++++++ cpp/examples/pub-sub/topic_publisher.cpp | 123 ++++++++++++++ cpp/examples/request-response/Makefile | 15 ++ cpp/examples/request-response/client.cpp | 178 +++++++++++++++++++++ cpp/examples/request-response/server.cpp | 153 ++++++++++++++++++ cpp/examples/topic_listener.cpp | 171 -------------------- cpp/examples/topic_publisher.cpp | 82 ---------- cpp/src/qpid/client/Session.h | 33 ++++ 21 files changed, 1794 insertions(+), 253 deletions(-) create mode 100644 cpp/examples/direct/Makefile create mode 100644 cpp/examples/direct/direct_config_queues.cpp create mode 100644 cpp/examples/direct/direct_persistent_config_queues.cpp create mode 100644 cpp/examples/direct/direct_persistent_publisher.cpp create mode 100644 cpp/examples/direct/direct_publisher.cpp create mode 100644 cpp/examples/direct/listener.cpp create mode 100644 cpp/examples/fanout/Makefile create mode 100644 cpp/examples/fanout/fanout_config_queues.cpp create mode 100644 cpp/examples/fanout/fanout_consumer.cpp create mode 100644 cpp/examples/fanout/fanout_publisher.cpp create mode 100644 cpp/examples/fanout/listener.cpp create mode 100644 cpp/examples/pub-sub/Makefile create mode 100644 cpp/examples/pub-sub/topic_config_queues.cpp create mode 100644 cpp/examples/pub-sub/topic_listener.cpp create mode 100644 cpp/examples/pub-sub/topic_publisher.cpp create mode 100644 cpp/examples/request-response/Makefile create mode 100644 cpp/examples/request-response/client.cpp create mode 100644 cpp/examples/request-response/server.cpp delete mode 100644 cpp/examples/topic_listener.cpp delete mode 100644 cpp/examples/topic_publisher.cpp create mode 100644 cpp/src/qpid/client/Session.h (limited to 'cpp') diff --git a/cpp/examples/direct/Makefile b/cpp/examples/direct/Makefile new file mode 100644 index 0000000000..c94e900a75 --- /dev/null +++ b/cpp/examples/direct/Makefile @@ -0,0 +1,20 @@ +CXX=g++ +CXXFLAGS= + +PROGRAMS=direct_config_queues listener direct_publisher direct_persistent_publisher +all: $(PROGRAMS) + +direct_config_queues: direct_config_queues.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +listener: listener.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +direct_publisher: direct_publisher.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +direct_persistent_publisher: direct_persistent_publisher.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +clean: + rm -f $(PROGRAMS) diff --git a/cpp/examples/direct/direct_config_queues.cpp b/cpp/examples/direct/direct_config_queues.cpp new file mode 100644 index 0000000000..3a52d4f62f --- /dev/null +++ b/cpp/examples/direct/direct_config_queues.cpp @@ -0,0 +1,86 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_config_queues.cpp + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp (this program): + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue named "message_queue", and route all messages whose + // routing key is "routing_key to this newly created queue. + + session.queueDeclare(arg::queue="message_queue"); + session.queueBind(arg::queue="message_queue", arg::routingKey="routing_key"); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/direct/direct_persistent_config_queues.cpp b/cpp/examples/direct/direct_persistent_config_queues.cpp new file mode 100644 index 0000000000..afe076278b --- /dev/null +++ b/cpp/examples/direct/direct_persistent_config_queues.cpp @@ -0,0 +1,101 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_config_durable_queues.cpp + * + * This program is one of a set of programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * + * direct_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + * direct_persistent_publisher.cpp: + * + * Publishes a combination of persistent and transient messages + * to a broker, specifying a routing key. The persistent messages + * survive server restart, the transient ones do not (unless the + * queues are configured as durable queues). + * + * direct_config_durable_queues.cpp (this program): + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. Uses persistent queues, so all + * messages on the queue survive server restart. + * + * + */ + +#include +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue named "message_queue", and route all messages whose + // routing key is "routing_key to this newly created queue. + + session.queueDeclare(arg::queue="message_queue"); + session.queueBind(arg::queue="message_queue", arg::routingKey="routing_key"); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/direct/direct_persistent_publisher.cpp b/cpp/examples/direct/direct_persistent_publisher.cpp new file mode 100644 index 0000000000..75637c7eb9 --- /dev/null +++ b/cpp/examples/direct/direct_persistent_publisher.cpp @@ -0,0 +1,111 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * direct_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp (this program): + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + + +#include +#include +#include + + +#include +#include +#include + +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main() { + Connection connection; + Message message; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // The routing key is a message property. We will use the same + // routing key for each message, so we'll set this property + // just once. (In most simple cases, there is no need to set + // other message properties.) + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "Message " << i; + message.setData(message_data.str()); + + // Make odd-numbered messages persistent + + if (i % 1) + message.getDeliveryProperties().setDeliveryMode(PERSISTENT); + else + message.getDeliveryProperties().setDeliveryMode(TRANSIENT); + + session.messageTransfer(arg::content=message); + } + + // And send a final message to indicate termination. + + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/direct/direct_publisher.cpp b/cpp/examples/direct/direct_publisher.cpp new file mode 100644 index 0000000000..5135f926c3 --- /dev/null +++ b/cpp/examples/direct/direct_publisher.cpp @@ -0,0 +1,104 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * direct_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp (this program): + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + + +#include +#include +#include + + +#include +#include +#include + +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main() { + Connection connection; + Message message; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // The routing key is a message property. We will use the same + // routing key for each message, so we'll set this property + // just once. (In most simple cases, there is no need to set + // other message properties.) + + message.getDeliveryProperties().setRoutingKey("routing_key"); + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message); + } + + // And send a final message to indicate termination. + + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/direct/listener.cpp b/cpp/examples/direct/listener.cpp new file mode 100644 index 0000000000..52840efa03 --- /dev/null +++ b/cpp/examples/direct/listener.cpp @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_listener.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp (this program): + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ +private: + std::string destination_name; + Dispatcher dispatcher; +public: + Listener(Session& session, string destination_name): + destination_name(destination_name), + dispatcher(session) + {}; + + virtual void listen(); + virtual void received(Message& message); + ~Listener() { }; +}; + + +void Listener::listen() { + std::cout << "Activating listener for: " < +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue named "message_queue", and route all messages whose + // routing key is "routing_key to this newly created queue. + + session.queueDeclare(arg::queue="message_queue"); + session.queueBind(arg::queue="message_queue", arg::exchange="amq.fanout"); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/fanout/fanout_consumer.cpp b/cpp/examples/fanout/fanout_consumer.cpp new file mode 100644 index 0000000000..663c765159 --- /dev/null +++ b/cpp/examples/fanout/fanout_consumer.cpp @@ -0,0 +1,83 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_listener.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * direct_consumer.cpp (this program): + * + * Reads from a queue on the broker using session.get(). + * + * This is less efficient that direct_listener.cpp, but simpler, + * and can be a better approach when synchronizing messages from + * multiple queues. + * + */ + +#include +#include +#include +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + Listener listener(session, "destination"); + ### session.get(); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/fanout/fanout_publisher.cpp b/cpp/examples/fanout/fanout_publisher.cpp new file mode 100644 index 0000000000..976c53aae4 --- /dev/null +++ b/cpp/examples/fanout/fanout_publisher.cpp @@ -0,0 +1,102 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * direct_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp (this program): + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + + +#include +#include +#include + + +#include +#include +#include + +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main() { + Connection connection; + Message message; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Unlike topic exchanges and direct exchanges, a fanout + // exchange need not set a routing key. + + Message message; + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + } + + // And send a final message to indicate termination. + + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/fanout/listener.cpp b/cpp/examples/fanout/listener.cpp new file mode 100644 index 0000000000..52840efa03 --- /dev/null +++ b/cpp/examples/fanout/listener.cpp @@ -0,0 +1,134 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_listener.cpp: + * + * This program is one of three programs designed to be used + * together. These programs do not specify the exchange type - the + * default exchange type is the direct exchange. + * + * direct_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * direct_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * direct_listener.cpp (this program): + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include +#include +#include +#include +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ +private: + std::string destination_name; + Dispatcher dispatcher; +public: + Listener(Session& session, string destination_name): + destination_name(destination_name), + dispatcher(session) + {}; + + virtual void listen(); + virtual void received(Message& message); + ~Listener() { }; +}; + + +void Listener::listen() { + std::cout << "Activating listener for: " < +#include + +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main() { + Connection connection; + Message msg; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + + /* A consumer application reads from the queue, and needs no + * knowledge of the exchanges used to route messages to the + * queue, or of the routing keys. + * + * A publisher application writes to the exchange, providing a + * routing key, It needs no knowledge of the queues or bindings + * used to route messages to consumers. + */ + + + /* Create queues on the broker. */ + + session.queueDeclare(arg::queue="news_queue"); + session.queueDeclare(arg::queue="weather_queue"); + session.queueDeclare(arg::queue="usa_queue"); + session.queueDeclare(arg::queue="europe_queue"); + + /* Bind these queues using routing keys, so messages will be + delivered to the right queues. */ + + session.queueBind(arg::exchange="amq.topic", arg::queue="news_queue", arg::routingKey="#.news"); + session.queueBind(arg::exchange="amq.topic", arg::queue="weather_queue", arg::routingKey="#.weather"); + session.queueBind(arg::exchange="amq.topic", arg::queue="usa_queue", arg::routingKey="usa.#"); + session.queueBind(arg::exchange="amq.topic", arg::queue="europe_queue", arg::routingKey="europe.#"); + + + /* + * We use a separate 'control' routing key for control + * messages. All such messages are routed to each queue. In + * this demo, we use a message with the content "That's all, + * Folks!" to signal that no more messages will be sent, and + * users of the queue can stop listening for messages. + * + * Because wildcard matching can result in more than one match for + * a given message, it can place more messages on the queues than + * were originally received. + * + * We do not use wildcard matching for control messages. We + * want to make sure that each such message is received once + * and only once. + */ + + + session.queueBind(arg::exchange="amq.topic", arg::queue="news_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="weather_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="usa_queue", arg::routingKey="control"); + session.queueBind(arg::exchange="amq.topic", arg::queue="europe_queue", arg::routingKey="control"); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/cpp/examples/pub-sub/topic_listener.cpp b/cpp/examples/pub-sub/topic_listener.cpp new file mode 100644 index 0000000000..323c93dd0b --- /dev/null +++ b/cpp/examples/pub-sub/topic_listener.cpp @@ -0,0 +1,169 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_listener.cpp: + * + * This program is one of three programs designed to be used + * together. These programs use the topic exchange. + * + * topic_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * topic_publisher.cpp: + * + * Publishes to a broker, specifying a routing key. + * + * topic_listener.cpp (this program): + * + * Reads from a queue on the broker using a message listener. + * + */ + +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener { + private: + Session& session; + SubscriptionManager subscriptions; + public: + Listener(Session& session); + virtual void prepareQueue(std::string queue, std::string routing_key); + virtual void received(Message& message); + virtual void listen(); + ~Listener() { }; +}; + + +/* + * Listener::Listener + * + * Subscribe to the queue, route it to a client destination for the + * listener. (The destination name merely identifies the destination + * in the listener, you can use any name as long as you use the same + * name for the listener). + */ + +Listener::Listener(Session& session) : + session(session), + subscriptions(session) +{ +} + + +void Listener::prepareQueue(std::string queue, std::string routing_key) { + + /* Create a unique queue name for this consumer by concatenating + * the queue name parameter with the Session ID. + */ + + queue += session.getId().str(); + std::cout << "Declaring queue: " << queue << std::endl; + + /* Declare an exclusive queue on the broker + */ + + session.queueDeclare(arg::queue=queue, arg::exclusive=true); + + /* Route messages to the new queue if they match the routing key. + * + * Also route any messages to with the "control" routing key to + * this queue so we know when it's time to stop. A publisher sends + * a message with the content "That's all, Folks!", using the + * "control" routing key, when it is finished. + */ + + session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey=routing_key); + session.queueBind(arg::exchange="amq.topic", arg::queue=queue, arg::routingKey="control"); + + /* + * subscribe to the queue using the subscription manager. + */ + + std::cout << "Subscribing to queue " << queue << std::endl; + subscriptions.subscribe(*this, queue); +} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << " from " << message.getDestination() << std::endl; + + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +void Listener::listen() { + subscriptions.run(); +} + +int main() { + Connection connection; + try { + connection.open("127.0.0.1", 5672); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Create a listener for the session + + Listener listener(session); + + // Subscribe to messages on the queues we are interested in + + listener.prepareQueue("usa", "usa.#"); + listener.prepareQueue("europe", "europe.#"); + listener.prepareQueue("news", "#.news"); + listener.prepareQueue("weather", "#.weather"); + + std::cout << "Listening for messages ..." << std::endl; + + // Give up control and receive messages + listener.listen(); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/pub-sub/topic_publisher.cpp b/cpp/examples/pub-sub/topic_publisher.cpp new file mode 100644 index 0000000000..52c2827e58 --- /dev/null +++ b/cpp/examples/pub-sub/topic_publisher.cpp @@ -0,0 +1,123 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * topic_publisher.cpp: + * + * This program is one of three programs designed to be used + * together. These programs use the topic exchange. + * + * topic_config_queues.cpp: + * + * Creates a queue on a broker, binding a routing key to route + * messages to that queue. + * + * topic_publisher.cpp (this program): + * + * Publishes to a broker, specifying a routing key. + * + * topic_listener.cpp + * + * Reads from a queue on the broker using a message listener. + * + */ + + +#include +#include +#include + + +#include +#include +#include + +#include + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +void publish_messages(Session& session, string routing_key) +{ + Message message; + + // Set the routing key once, we'll use the same routing key for all + // messages. + + message.getDeliveryProperties().setRoutingKey(routing_key); + for (int i=0; i<5; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message, arg::destination="amq.topic"); + } + +} + +/* + * no_more_messages() + * + * Send a message to indicate that no more messages are coming. + * Use the 'control' routing key (see comments in topic_config_queues.cpp). + * + */ + +void no_more_messages(Session& session) +{ + Message message; + + message.getDeliveryProperties().setRoutingKey("control"); + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.topic"); +} + +int main() { + Connection connection; + Message message; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + publish_messages(session, "usa.news"); + publish_messages(session, "usa.weather"); + publish_messages(session, "europe.news"); + publish_messages(session, "europe.weather"); + + no_more_messages(session); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/request-response/Makefile b/cpp/examples/request-response/Makefile new file mode 100644 index 0000000000..32612843eb --- /dev/null +++ b/cpp/examples/request-response/Makefile @@ -0,0 +1,15 @@ +CXX=g++ +CXXFLAGS= + +PROGRAMS=client server +all: $(PROGRAMS) + +client: client.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + +server: server.cpp + $(CXX) $(CXXFLAGS) -lqpidclient -o $@ $^ + + +clean: + rm -f $(PROGRAMS) diff --git a/cpp/examples/request-response/client.cpp b/cpp/examples/request-response/client.cpp new file mode 100644 index 0000000000..59024a1cb6 --- /dev/null +++ b/cpp/examples/request-response/client.cpp @@ -0,0 +1,178 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * client.cpp + * + * This program is one of two programs that illustrate the + * request/response pattern. + * + * client.cpp (this program) + * + * Make requests of a service, print the response. + * + * service.cpp + * + * Accept requests, reverse the letters in each message, and + * return it as a response. + * + */ + + +#include +#include +#include +#include +#include + +#include +#include +#include + +#include + +using namespace qpid::client; +using namespace qpid::framing; + +class Listener : public MessageListener{ +private: + Session session; + std::string destination_name; + Dispatcher dispatcher; + int counter; +public: + Listener(Session& session, string destination_name): + destination_name(destination_name), + dispatcher(session), + session(session), + counter(0) + {}; + + virtual void listen(); + virtual void wait(); + virtual void received(Message& message); + ~Listener() { }; +}; + + +void Listener::listen() { + std::cout << "Activating response queue listener for: " < 3) { + std::cout << "Shutting down listener for " << destination_name << std::endl; + dispatcher.stop(); + } +} + + +using std::stringstream; +using std::string; + +int main() { + Connection connection; + Message request; + try { + connection.open("127.0.0.1", 5672 ); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Create a response queue so the server can send us responses + // to our requests. Use the client's session ID as the name + // of the response queue. + + stringstream response_queue; + response_queue << "client " << session.getId(); + + // Use the name of the response queue as the routing key + + session.queueDeclare(arg::queue=response_queue.str()); // ### Nice if I could just use strstream for this + session.queueBind(arg::queue=response_queue.str(), arg::routingKey=response_queue.str()); + + // Create a listener for the response queue and start listening. + + Listener listener(session, response_queue.str()); + listener.listen(); + + + // The routing key for the request queue is simply + // "request_queue", and all clients use the same routing key. + // + // Each client sends the name of their own response queue so + // the service knows where to route messages. + + request.getDeliveryProperties().setRoutingKey("request_queue"); + request.getHeaders().setString("reply-to", response_queue.str()); + + // Now send some requests ... + + string s[] = { + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." + }; + + + for (int i=0; i<4; i++) { + request.setData(s[i]); + session.messageTransfer(arg::content=request); + std::cout << "Request: " << s[i] << std::endl; + } + + // And wait for any outstanding responses to arrive + + listener.wait(); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/cpp/examples/request-response/server.cpp b/cpp/examples/request-response/server.cpp new file mode 100644 index 0000000000..6c82090794 --- /dev/null +++ b/cpp/examples/request-response/server.cpp @@ -0,0 +1,153 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * server.cpp + * + * This program is one of two programs that illustrate the + * request/response pattern. + * + * client.cpp + * + * Make requests of a service, print the response. + * + * server.cpp (this program) + * + * Accept requests, reverse the letters in each message, and + * return it as a response. + * + */ + + +#include +#include +#include +#include +#include + + +#include +#include +#include +#include + +#include +#include + +using namespace qpid::client; +using namespace qpid::framing; +using std::stringstream; +using std::string; + +class Listener : public MessageListener{ +private: + std::string destination_name; + Dispatcher dispatcher; + Session session; +public: + Listener(Session& session, string destination_name): + destination_name(destination_name), + dispatcher(session), + session(session) + {}; + + virtual void listen(); + virtual void received(Message& message); + virtual void wait(); + ~Listener() { }; +}; + + +void Listener::listen() { + std::cout << "Activating request queue listener for: " < -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -using namespace qpid::client; -using namespace qpid::framing; - -class Listener : public MessageListener { - private: - Session_0_10& session; - SubscriptionManager subscriptions; - public: - Listener(Session_0_10& session); - virtual void prepareQueue(std::string queue, std::string routing_key); - virtual void received(Message& message); - virtual void listen(); - ~Listener() { }; -}; - -/** - * Listener::Listener - * - * Subscribe to the queue, route it to a client destination for the - * listener. (The destination name merely identifies the destination - * in the listener, you can use any name as long as you use the same - * name for the listener). - */ -Listener::Listener(Session_0_10& session) : - session(session), - subscriptions(session) -{} - - -void Listener::prepareQueue(std::string queue, std::string routing_key) { - - /* Create a unique queue name for this queue by concatenating - * the Session ID. - */ - queue += session.getId().str(); - - std::cout << "Declaring queue: " << queue << std::endl; - - /* Declare an exclusive queue on the broker - */ - - session.queueDeclare(arg::queue=queue, arg::exclusive=true); - - /* Route messages to the new queue if they match the routing key. - * - * Also route any messages to with the "control" routing key to - * this queue so we know when it's time to stop. A publisher sends - * a message with the content "That's all, Folks!", using the - * "control" routing key, when it is finished. - */ - - session.queueBind(arg::exchange="amq.topic", arg::queue=queue, - arg::routingKey=routing_key); - session.queueBind(arg::exchange="amq.topic", arg::queue=queue, - arg::routingKey="control"); - - - // Subscribe to the queue using the subscription manager. - // The name of the subscription defaults to the name of the queue. - // - std::cout << "Subscribing to queue " << queue << std::endl; - subscriptions.subscribe(*this, queue); -} - -void Listener::received(Message& message) { - // - // message.getDestination() returns the name of the subscription - // to which this message was sent, which by default is the name - // of the queue subscribed to. - // - std::cout << "Message: " << message.getData() - << " from " << message.getDestination() << std::endl; - - if (message.getData() == "That's all, folks!") { - std::cout << "Shutting down listener for " - << message.getDestination() << std::endl; - subscriptions.cancel(message.getDestination()); - } -} - -void Listener::listen() { - // run() will return when all the subscriptions are cancelled. - subscriptions.run(); -} - -int main() { - Connection connection; - try { - connection.open("127.0.0.1", 5672); - Session_0_10 session = connection.newSession(); - - // Create a listener for the session - - Listener listener(session); - - // Subscribe to messages on the queues we are interested in - - listener.prepareQueue("usa", "usa.#"); - listener.prepareQueue("europe", "europe.#"); - listener.prepareQueue("news", "#.news"); - listener.prepareQueue("weather", "#.weather"); - - std::cout << "Listening for messages ..." << std::endl; - - // Give up control and receive messages - listener.listen(); - - - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - diff --git a/cpp/examples/topic_publisher.cpp b/cpp/examples/topic_publisher.cpp deleted file mode 100644 index 7a1cd6a22b..0000000000 --- a/cpp/examples/topic_publisher.cpp +++ /dev/null @@ -1,82 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -/** - * This file provides one half of a test and example of a pub-sub - * style of interaction. See topic_listener.cpp for the other half, in - * which the logic for subscribers is defined. - * - * This file contains the publisher logic. The publisher will send a - * number of messages to the exchange with the appropriate routing key - * for the logical 'topic'. Once it has done this it will then send a - * request that each subscriber report back with the number of message - * it has received and the time that elapsed between receiving the - * first one and receiving the report request. Once the expected - * number of reports are received, it sends out a request that each - * subscriber shutdown. - */ - -#include "qpid/Exception.h" -#include "qpid/client/Channel.h" -#include "qpid/client/Connection.h" -#include "qpid/client/Exchange.h" -#include "qpid/client/MessageListener.h" -#include "qpid/client/Queue.h" -#include "qpid/sys/Monitor.h" -#include -#include "qpid/sys/Time.h" -#include -#include - -using namespace qpid::client; -using namespace qpid::sys; -using std::string; - -int main() { - Connection connection; - Channel channel; - Message msg; - try { - connection.open("127.0.0.1", 5672, "guest", "guest", "/test"); - connection.openChannel(channel); - channel.start(); - - //--------- Main body of program -------------------------------------------- - - for (int i=0; i<10; i++) { - msg.setData("Message "+i); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, "listener"); - } - msg.setData("That's all, folks!"); - channel.publish(msg, Exchange::STANDARD_TOPIC_EXCHANGE, "listener"); - - //----------------------------------------------------------------------------- - - channel.close(); - connection.close(); - return 0; - } catch(const std::exception& error) { - std::cout << error.what() << std::endl; - } - return 1; -} - - diff --git a/cpp/src/qpid/client/Session.h b/cpp/src/qpid/client/Session.h new file mode 100644 index 0000000000..6ddea470e6 --- /dev/null +++ b/cpp/src/qpid/client/Session.h @@ -0,0 +1,33 @@ +#ifndef QPID_CLIENT_SESSION_H +#define QPID_CLIENT_SESSION_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include "qpid/client/Session_0_10.h" + +namespace qpid { +namespace client { + +typedef Session_0_10 Session; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_SESSION_H*/ -- cgit v1.2.1