diff options
Diffstat (limited to 'RC9/qpid/cpp/examples')
59 files changed, 3757 insertions, 0 deletions
diff --git a/RC9/qpid/cpp/examples/Makefile.am b/RC9/qpid/cpp/examples/Makefile.am new file mode 100644 index 0000000000..29a101425c --- /dev/null +++ b/RC9/qpid/cpp/examples/Makefile.am @@ -0,0 +1,49 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +SUBDIRS = direct fanout pub-sub request-response failover qmf-console +if HAVE_XML + SUBDIRS += xml-exchange + broker_args = "--no-module-dir --data-dir \"\" --auth no --load-module $(top_builddir)/src/.libs/xml.so" +endif +if !HAVE_XML + exclude_examples_regexp="xml" # Exclude XML examples. + broker_args = "--no-module-dir --data-dir \"\" --auth no" +endif + +MAKEDIST=.libs/Makefile +SUBMAKE=' for d in $(SUBDIRS) ; do $$(MAKE) -C $$$$d $$@ ; done' +$(MAKEDIST): Makefile + mkdir -p .libs + @$(ECHO) all: > $(MAKEDIST) + @$(ECHO) $(SUBMAKE) >> $(MAKEDIST) + @$(ECHO) clean: >> $(MAKEDIST) + @$(ECHO) $(SUBMAKE) >> $(MAKEDIST) + +examplesdir=$(pkgdatadir)/examples +examples_DATA = README $(MAKEDIST) + +EXTRA_DIST = $(examples_DATA) README.verify verify verify_all + +# For older versions of automake +abs_top_srcdir = @abs_top_srcdir@ + +# Verify the examples in the buid tree. +check-local: + $(srcdir)/verify_all $(abs_top_srcdir)/.. $(top_builddir)/src/qpidd $(broker_args) $(exclude_examples_regexp) + diff --git a/RC9/qpid/cpp/examples/README b/RC9/qpid/cpp/examples/README new file mode 100644 index 0000000000..50f0c07089 --- /dev/null +++ b/RC9/qpid/cpp/examples/README @@ -0,0 +1,114 @@ += Qpid C++ Examples = + +Apache Qpid Examples in C++ are found inside this directory, they are +based on the 0-10 version of the AMQP specification (see amqp.org for +details). A short description on each example is found below. + +Please note that you will have to start the Qpid broker on port 5672, +on your localhost (127.0.0.1) before running these examples. However, +it is possible to alternatively specify the host and port when running +each example. + + Ex:- ./declare_queues 127.0.0.1 5673 + +The qpid C++ broker (known as qpidd) is found +- if installed, installed as /usr/sbin/qpidd +- in /path-to-qpid-source/cpp/src/ + +== Direct == + +This is an example on how to create Point-to-Point applications using Qpid. This +example contains three main components. + + 1. declare_queues + This will bind a queue to the amq.direct exchange, so that the messages sent + to the amq.direct exchange, with a given routing key (routing_key) are + delivered to a specific queue (message_queue). + + 2. direct_producer + Publishes messages to the amq.direct exchange using the given routing key + (routing_key) discussed above. + + 3. listener + Uses a message listener to listen messages from a specific queue + (message_queue) as discussed above. + +In order to run this example, + +On Linux: + # ./declare_queues + # ./direct_producer + # ./listener + +== Fanout == + +This is an example on how to create Fanout exchange applications using Qpid. +This example has two components unlike the previous. This is because Fanout +exchange not needing a routing key to be specified. + + 1. fanout_producer + Publishes a message to the amq.fanout exchange, without using a routing key. + + 2. listener + Uses a message listener to listen messages from the amq.fanout exchange. + +Another difference between the above example and this example is that in the +above example there is no requirement for the listener to be activated before +the messages being published. However, in this example, it is required that a +listener be active before the messages being published, if not they will be +lost. + +In order to run this example, + +On Linux: + # ./listener + # ./fanout_producer + +== Publisher/Subscriber == + +Showing The ability to create topic Publishers and Subscribers using Qpid is +the main objective of this example. It is required that you subscribe first, +before publishing any message due to the construction of this example. There +are two main applications in this. + + 1. topic_publisher + This application is used to publish messages to the amq.topic exchange using + multipart routing keys, usa.weather, europe.weather, usa.news and europe.news. + + 2. topic_listener + This application is used to subscribe to several private queues, such as usa, + europe, weather and news. In here, each private queue created is bound to the + amq.topic exchange using bindings that match the corresponding parts of the + multipart routing keys. + Ex:- #.news will retrieve news irrespective of destination. + +This example also shows the use of the 'control' routing key which is used by +control messages. + +In order to run this example, + +On Linux: + # ./topic_listener + # ./topic_publisher + +== Request/Response == + +This example shows a simple server that will accept string from a client and +convert them to upper case and send them back to the client. This too has two +main application like the previous sample. + + 1. client + This sends lines of poetry to the server. + + 2. server + This is a simple service that will convert incoming strings to upper case and + send the result to amq.direct exchange on which the client listens. It uses the + request's reply_to property as the response's routing key. + +In order to run this example, + +On Linux: + # ./server + # ./client + + diff --git a/RC9/qpid/cpp/examples/README.verify b/RC9/qpid/cpp/examples/README.verify new file mode 100644 index 0000000000..d15adce58d --- /dev/null +++ b/RC9/qpid/cpp/examples/README.verify @@ -0,0 +1,26 @@ += Qpid C++ Examples = + +For more information read examples/README. + +== The Verify All Script == + +The verify_all script will run C++ examples against itself and against the +Python examples. The success of the script is determined by comparing its output +against what is expected. + +=== Arguments === + +The verify_all script expects the path to Qpid trunk as an argument, in order to +setup the environment for Python examples. + +== The Verify Script == + +The verify script is capable of running one or many scripts designed to verify +the success of Qpid examples. The verify script is utilized by the verify_all +scripts. + +=== Verifying an individual example === + +This will require you using the verify script, and providing the necessary sub +script(s) it will utilize in the process. Please note that it is your +responsibility to setup the necessary environment for the verification process diff --git a/RC9/qpid/cpp/examples/direct/Makefile.am b/RC9/qpid/cpp/examples/direct/Makefile.am new file mode 100644 index 0000000000..07431f6fe4 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/Makefile.am @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/direct + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=direct_producer listener declare_queues +direct_producer_SOURCES=direct_producer.cpp +direct_producer_LDADD=$(CLIENT_LIB) + +listener_SOURCES=listener.cpp +listener_LDADD=$(CLIENT_LIB) + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + direct_producer.cpp \ + listener.cpp \ + declare_queues.cpp \ + $(MAKEDIST) + +EXTRA_DIST= \ + $(examples_DATA) \ + verify \ + verify.in \ + verify_cpp_python \ + verify_cpp_python.in \ + verify_python_cpp \ + verify_python_cpp.in + diff --git a/RC9/qpid/cpp/examples/direct/declare_queues.cpp b/RC9/qpid/cpp/examples/direct/declare_queues.cpp new file mode 100644 index 0000000000..9a51d1982b --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/declare_queues.cpp @@ -0,0 +1,85 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * declare_queues.cpp + * + * This program is one of three programs designed to be used + * together. + * + * declare_queues.cpp: (this program): + * + * Creates a queue named "message_queue" on a broker, binding the + * queue to the "amq.direct" exchange, using the routing key + * "routing_key". + * + * direct_producer.cpp + * + * Publishes to the "amq.direct" exchange, specifying the routing + * key "routing_key" + * + * listener.cpp + * + * Reads from the "message_queue" queue on the broker using a + * message listener. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + + +using namespace qpid::client; +using namespace qpid::framing; + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Create a queue named "message_queue", and route all messages whose + // routing key is "routing_key" to this newly created queue. + + session.queueDeclare(arg::queue="message_queue"); + session.exchangeBind(arg::exchange="amq.direct", arg::queue="message_queue", arg::bindingKey="routing_key"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/RC9/qpid/cpp/examples/direct/direct_producer.cpp b/RC9/qpid/cpp/examples/direct/direct_producer.cpp new file mode 100644 index 0000000000..7cca1955cf --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/direct_producer.cpp @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * direct_producer.cpp: + * + * This program is one of three programs designed to be used + * together. + * + * create_queues.cpp: + * + * Creates a queue named "message_queue" on a broker, binding the + * queue to the "amq.direct" exchange, using the routing key + * "routing_key". + * + * direct_producer.cpp (this program): + * + * Publishes to the "amq.direct" exchange, specifying the routing + * key "routing_key" + * + * listener.cpp + * + * Reads from the "message_queue" queue on the broker using a + * message listener. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + int count = argc>3 ? atoi(argv[3]) : 10; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // The routing key is a message property. We will use the same + // routing key for each message, so we'll set this property + // just once. (In most simple cases, there is no need to set + // other message properties.) + + Message message; + message.getDeliveryProperties().setRoutingKey("routing_key"); + + // Now send some messages ... + + for (int i=0; i<count; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + } + + // And send a final message to indicate termination. + + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.direct"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/direct/listener.cpp b/RC9/qpid/cpp/examples/direct/listener.cpp new file mode 100644 index 0000000000..55229df8a3 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/listener.cpp @@ -0,0 +1,110 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * listener.cpp: + * + * This program is one of three programs designed to be used + * together. + * + * create_queues.cpp: + * + * Creates a queue named "message_queue" on a broker, binding the + * queue to the "amq.direct" exchange, using the routing key + * "routing_key". + * + * direct_producer.cpp + * + * Publishes to the "amq.direct" exchange, specifying the routing + * key "routing_key" + * + * listener.cpp (this program): + * + * Reads from the "message_queue" queue on the broker using a + * message listener. + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Receive messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/direct/verify b/RC9/qpid/cpp/examples/direct/verify new file mode 100644 index 0000000000..f598bacc1f --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +clients ./declare_queues ./direct_producer ./listener +outputs ./declare_queues.out ./direct_producer.out ./listener.out diff --git a/RC9/qpid/cpp/examples/direct/verify.in b/RC9/qpid/cpp/examples/direct/verify.in new file mode 100644 index 0000000000..d1e95f1151 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify.in @@ -0,0 +1,15 @@ +==== declare_queues.out +==== direct_producer.out +==== listener.out +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for message_queue diff --git a/RC9/qpid/cpp/examples/direct/verify_cpp_python b/RC9/qpid/cpp/examples/direct/verify_cpp_python new file mode 100644 index 0000000000..4dc445ba27 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify_cpp_python @@ -0,0 +1,4 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/direct +clients ./declare_queues ./direct_producer $py/direct_consumer.py +outputs ./declare_queues.out ./direct_producer.out $py/direct_consumer.py.out diff --git a/RC9/qpid/cpp/examples/direct/verify_cpp_python.in b/RC9/qpid/cpp/examples/direct/verify_cpp_python.in new file mode 100644 index 0000000000..1a329be59a --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify_cpp_python.in @@ -0,0 +1,14 @@ +==== declare_queues.out +==== direct_producer.out +==== direct_consumer.py.out +Message 0 +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +That's all, folks! diff --git a/RC9/qpid/cpp/examples/direct/verify_python_cpp b/RC9/qpid/cpp/examples/direct/verify_python_cpp new file mode 100644 index 0000000000..fe4893e120 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify_python_cpp @@ -0,0 +1,5 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/direct +clients $py/declare_queues.py $py/direct_producer.py ./listener +outputs $py/declare_queues.py.out $py/direct_producer.py.out ./listener.out + diff --git a/RC9/qpid/cpp/examples/direct/verify_python_cpp.in b/RC9/qpid/cpp/examples/direct/verify_python_cpp.in new file mode 100644 index 0000000000..6f35255b18 --- /dev/null +++ b/RC9/qpid/cpp/examples/direct/verify_python_cpp.in @@ -0,0 +1,15 @@ +==== declare_queues.py.out +==== direct_producer.py.out +==== listener.out +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for message_queue diff --git a/RC9/qpid/cpp/examples/failover/Makefile.am b/RC9/qpid/cpp/examples/failover/Makefile.am new file mode 100644 index 0000000000..c41f26eaba --- /dev/null +++ b/RC9/qpid/cpp/examples/failover/Makefile.am @@ -0,0 +1,40 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/failover + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=declare_queues resuming_receiver replaying_sender + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +resuming_receiver_SOURCES=resuming_receiver.cpp +resuming_receiver_LDADD=$(CLIENT_LIB) + +replaying_sender_SOURCES=replaying_sender.cpp +replaying_sender_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + declare_queues.cpp \ + resuming_receiver.cpp \ + replaying_sender.cpp \ + $(MAKEDIST) + +# FIXME aconway 2008-10-10: add verify scripts. diff --git a/RC9/qpid/cpp/examples/failover/declare_queues.cpp b/RC9/qpid/cpp/examples/failover/declare_queues.cpp new file mode 100644 index 0000000000..a677870c53 --- /dev/null +++ b/RC9/qpid/cpp/examples/failover/declare_queues.cpp @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/Exception.h> + +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; + +using namespace std; + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + FailoverManager connection(settings); + try { + bool complete = false; + while (!complete) { + Session session = connection.connect().newSession(); + try { + session.queueDeclare(arg::queue="message_queue"); + complete = true; + } catch (const qpid::TransportFailure&) {} + } + connection.close(); + return 0; + } catch (const std::exception& error) { + std::cout << "Failed:" << error.what() << std::endl; + return 1; + } + +} + + + + + diff --git a/RC9/qpid/cpp/examples/failover/replaying_sender.cpp b/RC9/qpid/cpp/examples/failover/replaying_sender.cpp new file mode 100644 index 0000000000..22a7e1ebd3 --- /dev/null +++ b/RC9/qpid/cpp/examples/failover/replaying_sender.cpp @@ -0,0 +1,97 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageReplayTracker.h> +#include <qpid/Exception.h> + +#include <iostream> +#include <sstream> + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + +class Sender : public FailoverManager::Command +{ + public: + Sender(const std::string& queue, uint count); + void execute(AsyncSession& session, bool isRetry); + uint getSent(); + private: + MessageReplayTracker sender; + const uint count; + uint sent; + Message message; + +}; + +Sender::Sender(const std::string& queue, uint count_) : sender(10), count(count_), sent(0) +{ + message.getDeliveryProperties().setRoutingKey(queue); +} + +void Sender::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) sender.replay(session); + else sender.init(session); + while (sent < count) { + stringstream message_data; + message_data << ++sent; + message.setData(message_data.str()); + message.getHeaders().setInt("sn", sent); + sender.send(message); + if (count > 1000 && !(sent % 1000)) { + std::cout << "sent " << sent << " of " << count << std::endl; + } + } + message.setData("That's all, folks!"); + sender.send(message); +} + +uint Sender::getSent() +{ + return sent; +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + FailoverManager connection(settings); + Sender sender("message_queue", argc > 3 ? atoi(argv[3]) : 1000); + try { + connection.execute(sender); + std::cout << "Sent " << sender.getSent() << " messages." << std::endl; + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << "Failed: " << error.what() << std::endl; + } + return 1; +} diff --git a/RC9/qpid/cpp/examples/failover/resuming_receiver.cpp b/RC9/qpid/cpp/examples/failover/resuming_receiver.cpp new file mode 100644 index 0000000000..d1886ce861 --- /dev/null +++ b/RC9/qpid/cpp/examples/failover/resuming_receiver.cpp @@ -0,0 +1,127 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/client/FailoverManager.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <iostream> +#include <fstream> + + +using namespace qpid; +using namespace qpid::client; +using namespace qpid::framing; + +using namespace std; + + +class Listener : public MessageListener, + public FailoverManager::Command, + public FailoverManager::ReconnectionStrategy +{ + public: + Listener(); + void received(Message& message); + void execute(AsyncSession& session, bool isRetry); + void check(); + void editUrlList(std::vector<Url>& urls); + private: + Subscription subscription; + uint count; + uint skipped; + uint lastSn; + bool gaps; +}; + +Listener::Listener() : count(0), skipped(0), lastSn(0), gaps(false) {} + +void Listener::received(Message & message) +{ + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + + std::cout << "Listener received " << count << " messages (" << skipped << " skipped)" << std::endl; + subscription.cancel(); + } else { + uint sn = message.getHeaders().getAsInt("sn"); + if (lastSn < sn) { + if (sn - lastSn > 1) { + std::cout << "Error: gap in sequence between " << lastSn << " and " << sn << std::endl; + gaps = true; + } + lastSn = sn; + ++count; + } else { + ++skipped; + } + } +} + +void Listener::check() +{ + if (gaps) throw Exception("Detected gaps in sequence; messages appear to have been lost."); +} + +void Listener::execute(AsyncSession& session, bool isRetry) +{ + if (isRetry) { + std::cout << "Resuming from " << count << std::endl; + } + SubscriptionManager subs(session); + subscription = subs.subscribe(*this, "message_queue"); + subs.run(); +} + +void Listener::editUrlList(std::vector<Url>& urls) +{ + /** + * A more realistic algorithm would be to search through the list + * for prefered hosts and ensure they come first in the list. + */ + if (urls.size() > 1) std::rotate(urls.begin(), urls.begin() + 1, urls.end()); +} + +int main(int argc, char ** argv) +{ + ConnectionSettings settings; + if (argc > 1) settings.host = argv[1]; + if (argc > 2) settings.port = atoi(argv[2]); + + Listener listener; + FailoverManager connection(settings, &listener); + + try { + connection.execute(listener); + connection.close(); + listener.check(); + std::cout << "Completed without error." << std::endl; + return 0; + } catch(const std::exception& error) { + std::cout << "Failure: " << error.what() << std::endl; + } + return 1; +} + + + diff --git a/RC9/qpid/cpp/examples/fanout/Makefile.am b/RC9/qpid/cpp/examples/fanout/Makefile.am new file mode 100644 index 0000000000..6cbf2c93ad --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/Makefile.am @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/fanout + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=fanout_producer listener +fanout_producer_SOURCES=fanout_producer.cpp +fanout_producer_LDADD=$(CLIENT_LIB) + +listener_SOURCES=listener.cpp +listener_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + fanout_producer.cpp \ + listener.cpp \ + $(MAKEDIST) + +EXTRA_DIST= \ + $(examples_DATA) \ + verify \ + verify.in \ + verify_cpp_python \ + verify_cpp_python.in \ + verify_python_cpp \ + verify_python_cpp.in + + + + + diff --git a/RC9/qpid/cpp/examples/fanout/fanout_producer.cpp b/RC9/qpid/cpp/examples/fanout/fanout_producer.cpp new file mode 100644 index 0000000000..fb16f7e8b1 --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/fanout_producer.cpp @@ -0,0 +1,106 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * fanout_producer.cpp: + * + * This program is one of two programs designed to be used + * together. + * + * fanout_producer.cpp (this program): + * + * Publishes messages to the "amq.fanout" exchange. + * + * listener.cpp + * + * Creates a private queue, binds it to the "amq.fanout" + * exchange, and reads messages from its queue as they + * arrive. Messages sent before the listener binds the queue are + * not received. + * + * Multiple listeners can run at the same time. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Unlike topic exchanges and direct exchanges, a fanout + // exchange need not set a routing key. + + Message message; + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.fanout"); + } + + // And send a final message to indicate termination. + + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.fanout"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/fanout/listener.cpp b/RC9/qpid/cpp/examples/fanout/listener.cpp new file mode 100644 index 0000000000..b6050ef728 --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/listener.cpp @@ -0,0 +1,126 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * listener.cpp + * + * This program is one of two programs designed to be used + * together. + * + * fanout_producer.cpp + * + * Publishes messages to the "amq.fanout" exchange. + * + * listener.cpp (this program) + * + * Creates a private queue, binds it to the "amq.fanout" + * exchange, and reads messages from its queue as they + * arrive. Messages sent before the listener binds the queue are + * not received. + * + * Multiple listeners can run at the same time. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << std::endl; + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Each client creates its own private queue, using the + // session id to guarantee a unique name. It then routes + // all messages from the fanout exchange to its own queue + // by binding to the queue. + // + // The binding specifies a binding key, but for a fanout + // exchange, the binding key is optional and is not used + // for routing decisions. It can be useful for tracking + // messages and routing in logs. + + std::string myQueue=session.getId().getName(); + session.queueDeclare(arg::queue=myQueue, arg::exclusive=true, + arg::autoDelete=true); + + session.exchangeBind(arg::exchange="amq.fanout", arg::queue=myQueue, arg::bindingKey="my-key"); + + // Create a listener and subscribe it to my queue. + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, myQueue); + + // Receive messages until the subscription is cancelled + // by Listener::received() + std::cout << "Listening" << std::endl; + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/fanout/verify b/RC9/qpid/cpp/examples/fanout/verify new file mode 100644 index 0000000000..2eaadff56b --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify @@ -0,0 +1,25 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Listening" ./listener +background "Listening" ./listener +background "Listening" ./listener +clients ./fanout_producer +outputs ./fanout_producer.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" "./listenerXX.out | remove_uuid" diff --git a/RC9/qpid/cpp/examples/fanout/verify.in b/RC9/qpid/cpp/examples/fanout/verify.in new file mode 100644 index 0000000000..8f8612ce67 --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify.in @@ -0,0 +1,43 @@ +==== fanout_producer.out +==== listener.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for +==== listenerXX.out | remove_uuid +Listening +Message: Message 0 +Message: Message 1 +Message: Message 2 +Message: Message 3 +Message: Message 4 +Message: Message 5 +Message: Message 6 +Message: Message 7 +Message: Message 8 +Message: Message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/RC9/qpid/cpp/examples/fanout/verify_cpp_python b/RC9/qpid/cpp/examples/fanout/verify_cpp_python new file mode 100644 index 0000000000..6a1ba7ad7d --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify_cpp_python @@ -0,0 +1,7 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/fanout +background "Subscribed" $py/fanout_consumer.py +background "Subscribed" $py/fanout_consumer.py +clients ./fanout_producer +outputs ./fanout_producer.out "$py/fanout_consumer.py.out | remove_uuid" "$py/fanout_consumer.pyX.out | remove_uuid" + diff --git a/RC9/qpid/cpp/examples/fanout/verify_cpp_python.in b/RC9/qpid/cpp/examples/fanout/verify_cpp_python.in new file mode 100644 index 0000000000..21bafe06de --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify_cpp_python.in @@ -0,0 +1,27 @@ +==== fanout_producer.out +==== fanout_consumer.py.out | remove_uuid +Subscribed to queue +Message 0 +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +That's all, folks! +==== fanout_consumer.pyX.out | remove_uuid +Subscribed to queue +Message 0 +Message 1 +Message 2 +Message 3 +Message 4 +Message 5 +Message 6 +Message 7 +Message 8 +Message 9 +That's all, folks! diff --git a/RC9/qpid/cpp/examples/fanout/verify_python_cpp b/RC9/qpid/cpp/examples/fanout/verify_python_cpp new file mode 100644 index 0000000000..d9b3361523 --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify_python_cpp @@ -0,0 +1,7 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/fanout +background "Listening" ./listener +background "Listening" ./listener +clients $py/fanout_producer.py +outputs $py/fanout_producer.py.out "./listener.out | remove_uuid" "./listenerX.out | remove_uuid" + diff --git a/RC9/qpid/cpp/examples/fanout/verify_python_cpp.in b/RC9/qpid/cpp/examples/fanout/verify_python_cpp.in new file mode 100644 index 0000000000..8f9e959053 --- /dev/null +++ b/RC9/qpid/cpp/examples/fanout/verify_python_cpp.in @@ -0,0 +1,29 @@ +==== fanout_producer.py.out +==== listener.out | remove_uuid +Listening +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for +==== listenerX.out | remove_uuid +Listening +Message: message 0 +Message: message 1 +Message: message 2 +Message: message 3 +Message: message 4 +Message: message 5 +Message: message 6 +Message: message 7 +Message: message 8 +Message: message 9 +Message: That's all, folks! +Shutting down listener for diff --git a/RC9/qpid/cpp/examples/makedist.mk b/RC9/qpid/cpp/examples/makedist.mk new file mode 100644 index 0000000000..4345378983 --- /dev/null +++ b/RC9/qpid/cpp/examples/makedist.mk @@ -0,0 +1,22 @@ +# Settings to build the examples in automake +AM_CXXFLAGS = $(WARNING_CFLAGS) +INCLUDES = -I$(top_srcdir)/src -I$(top_srcdir)/src/gen -I$(top_builddir)/src -I$(top_builddir)/src/gen +CLIENT_LIB=$(top_builddir)/src/libqpidclient.la +CONSOLE_LIB=$(top_builddir)/src/libqmfconsole.la +MAKELDFLAG ?= qpidclient + +# Generate a simple non-automake Makefile for distribution. +MAKEDIST=.libs/Makefile + +$(MAKEDIST): Makefile + mkdir -p .libs + @$(ECHO) CXX=$(CXX) > $(MAKEDIST) + @$(ECHO) CXXFLAGS=$(CXXFLAGS) >> $(MAKEDIST) + @$(ECHO) LDFLAGS=-l$(MAKELDFLAG) >> $(MAKEDIST) + @$(ECHO) >> $(MAKEDIST) + @$(ECHO) all: $(noinst_PROGRAMS) >> $(MAKEDIST) + @$(ECHO) >> $(MAKEDIST) + @$(ECHO) clean: >> $(MAKEDIST) + @$(ECHO) " rm -f $(noinst_PROGRAMS)" >> $(MAKEDIST) + + diff --git a/RC9/qpid/cpp/examples/pub-sub/Makefile.am b/RC9/qpid/cpp/examples/pub-sub/Makefile.am new file mode 100644 index 0000000000..bdbf0f8d20 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/Makefile.am @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/pub-sub + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=topic_listener topic_publisher + +topic_listener_SOURCES=topic_listener.cpp +topic_listener_LDADD=$(CLIENT_LIB) + +topic_publisher_SOURCES=topic_publisher.cpp +topic_publisher_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + topic_listener.cpp \ + topic_publisher.cpp \ + $(MAKEDIST) + +EXTRA_DIST= \ + $(examples_DATA) \ + verify \ + verify.in \ + verify_cpp_python \ + verify_cpp_python.in \ + verify_python_cpp \ + verify_python_cpp.in + + + + + diff --git a/RC9/qpid/cpp/examples/pub-sub/topic_listener.cpp b/RC9/qpid/cpp/examples/pub-sub/topic_listener.cpp new file mode 100644 index 0000000000..fe0280cb7e --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/topic_listener.cpp @@ -0,0 +1,174 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_listener.cpp: + * + * This program is one of two programs designed to be used + * together. These programs implement a publish-subscribe example + * using the "amq.topic" exchange. + * + * topic_publisher.cpp + * + * Sends messages to the "amq.topic" exchange, using the + * multipart routing keys "usa.news", "usa.weather", + * "europe.news", and "europe.weather". + * + * topic_listener.cpp (this program) + * + * Creates private queues for "news", "weather", "usa", and + * "europe", binding them to the amq.topic exchange using + * bindings that match the corresponding parts of the multipart + * routing keys. + * + * Multiple listeners can be run at the same time. + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener { + private: + Session& session; + SubscriptionManager subscriptions; + public: + Listener(Session& session); + virtual void prepareQueue(std::string queue, std::string exchange, std::string routing_key); + virtual void received(Message& message); + virtual void listen(); + ~Listener() { }; +}; + + +/* + * Listener::Listener + * + * Subscribe to the queue, route it to a client destination for the + * listener. (The destination name merely identifies the destination + * in the listener, you can use any name as long as you use the same + * name for the listener). + */ + +Listener::Listener(Session& session) : + session(session), + subscriptions(session) +{ +} + + +void Listener::prepareQueue(std::string queue, std::string exchange, std::string routing_key) { + + /* Create a unique queue name for this consumer by concatenating + * the queue name parameter with the Session ID. + */ + + queue += session.getId().getName(); + std::cout << "Declaring queue: " << queue << std::endl; + + /* Declare an exclusive queue on the broker + */ + + session.queueDeclare(arg::queue=queue, arg::exclusive=true, arg::autoDelete=true); + + /* Route messages to the new queue if they match the routing key. + * + * Also route any messages to with the "control" routing key to + * this queue so we know when it's time to stop. A publisher sends + * a message with the content "That's all, Folks!", using the + * "control" routing key, when it is finished. + */ + + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey=routing_key); + session.exchangeBind(arg::exchange=exchange, arg::queue=queue, arg::bindingKey="control"); + + /* + * subscribe to the queue using the subscription manager. + */ + + std::cout << "Subscribing to queue " << queue << std::endl; + subscriptions.subscribe(*this, queue); +} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << " from " << message.getDestination() << std::endl; + + if (message.getData() == "That's all, folks!") { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +void Listener::listen() { + // Receive messages + subscriptions.run(); +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + std::string exchange = argc>3 ? argv[3] : "amq.topic"; + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Create a listener for the session + + Listener listener(session); + + // Subscribe to messages on the queues we are interested in + + listener.prepareQueue("usa", exchange, "usa.#"); + listener.prepareQueue("europe", exchange, "europe.#"); + listener.prepareQueue("news", exchange, "#.news"); + listener.prepareQueue("weather", exchange, "#.weather"); + + std::cout << "Listening for messages ..." << std::endl; + + // Give up control and receive messages + listener.listen(); + + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/pub-sub/topic_publisher.cpp b/RC9/qpid/cpp/examples/pub-sub/topic_publisher.cpp new file mode 100644 index 0000000000..72ba572f75 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/topic_publisher.cpp @@ -0,0 +1,130 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +/** + * topic_publisher.cpp: + * + * This program is one of two programs designed to be used + * together. These programs implement a publish-subscribe example + * using the "amq.topic" exchange. + * + * topic_publisher.cpp (this program) + * + * Sends messages to the "amq.topic" exchange, using the + * multipart routing keys "usa.news", "usa.weather", + * "europe.news", and "europe.weather". + * + * topic_listener.cpp + * + * Creates private queues for "news", "weather", "usa", and + * "europe", binding them to the amq.topic exchange using + * bindings that match the corresponding parts of the multipart + * routing keys. + * + * Multiple listeners can be run at the same time. + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +void publish_messages(Session& session, string routing_key) +{ + Message message; + + // Set the routing key once, we'll use the same routing key for all + // messages. + + message.getDeliveryProperties().setRoutingKey(routing_key); + for (int i=0; i<5; i++) { + stringstream message_data; + message_data << "Message " << i; + + message.setData(message_data.str()); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="amq.topic"); + } + +} + +/* + * no_more_messages() + * + * Send a message to indicate that no more messages are coming. + * Use the 'control' routing key (see comments in topic_config_queues.cpp). + * + */ + +void no_more_messages(Session& session) +{ + Message message; + + message.getDeliveryProperties().setRoutingKey("control"); + message.setData("That's all, folks!"); + session.messageTransfer(arg::content=message, arg::destination="amq.topic"); +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + publish_messages(session, "usa.news"); + publish_messages(session, "usa.weather"); + publish_messages(session, "europe.news"); + publish_messages(session, "europe.weather"); + + no_more_messages(session); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/pub-sub/verify b/RC9/qpid/cpp/examples/pub-sub/verify new file mode 100644 index 0000000000..528d2f401e --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Listening" ./topic_listener +clients ./topic_publisher +outputs ./topic_publisher.out "topic_listener.out | remove_uuid | sort" diff --git a/RC9/qpid/cpp/examples/pub-sub/verify.in b/RC9/qpid/cpp/examples/pub-sub/verify.in new file mode 100644 index 0000000000..6413c5c788 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify.in @@ -0,0 +1,59 @@ +==== topic_publisher.out +==== topic_listener.out | remove_uuid | sort +Declaring queue: europe +Declaring queue: news +Declaring queue: usa +Declaring queue: weather +Listening for messages ... +Message: Message 0 from europe +Message: Message 0 from europe +Message: Message 0 from news +Message: Message 0 from news +Message: Message 0 from usa +Message: Message 0 from usa +Message: Message 0 from weather +Message: Message 0 from weather +Message: Message 1 from europe +Message: Message 1 from europe +Message: Message 1 from news +Message: Message 1 from news +Message: Message 1 from usa +Message: Message 1 from usa +Message: Message 1 from weather +Message: Message 1 from weather +Message: Message 2 from europe +Message: Message 2 from europe +Message: Message 2 from news +Message: Message 2 from news +Message: Message 2 from usa +Message: Message 2 from usa +Message: Message 2 from weather +Message: Message 2 from weather +Message: Message 3 from europe +Message: Message 3 from europe +Message: Message 3 from news +Message: Message 3 from news +Message: Message 3 from usa +Message: Message 3 from usa +Message: Message 3 from weather +Message: Message 3 from weather +Message: Message 4 from europe +Message: Message 4 from europe +Message: Message 4 from news +Message: Message 4 from news +Message: Message 4 from usa +Message: Message 4 from usa +Message: Message 4 from weather +Message: Message 4 from weather +Message: That's all, folks! from europe +Message: That's all, folks! from news +Message: That's all, folks! from usa +Message: That's all, folks! from weather +Shutting down listener for europe +Shutting down listener for news +Shutting down listener for usa +Shutting down listener for weather +Subscribing to queue europe +Subscribing to queue news +Subscribing to queue usa +Subscribing to queue weather diff --git a/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python b/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python new file mode 100644 index 0000000000..f6c6850981 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python @@ -0,0 +1,6 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/pubsub +background "Queues created" $py/topic_subscriber.py +clients ./topic_publisher +outputs ./topic_publisher.out "$py/topic_subscriber.py.out | remove_uuid | sort" + diff --git a/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python.in b/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python.in new file mode 100644 index 0000000000..951d9ad9dd --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify_cpp_python.in @@ -0,0 +1,55 @@ +==== topic_publisher.out +==== topic_subscriber.py.out | remove_uuid | sort +Message 0 +Message 0 +Message 0 +Message 0 +Message 0 +Message 0 +Message 0 +Message 0 +Message 1 +Message 1 +Message 1 +Message 1 +Message 1 +Message 1 +Message 1 +Message 1 +Message 2 +Message 2 +Message 2 +Message 2 +Message 2 +Message 2 +Message 2 +Message 2 +Message 3 +Message 3 +Message 3 +Message 3 +Message 3 +Message 3 +Message 3 +Message 3 +Message 4 +Message 4 +Message 4 +Message 4 +Message 4 +Message 4 +Message 4 +Message 4 +Messages on 'europe' queue: +Messages on 'news' queue: +Messages on 'usa' queue: +Messages on 'weather' queue: +Queues created - please start the topic producer +Subscribing local queue 'local_europe' to europe-' +Subscribing local queue 'local_news' to news-' +Subscribing local queue 'local_usa' to usa-' +Subscribing local queue 'local_weather' to weather-' +That's all, folks! +That's all, folks! +That's all, folks! +That's all, folks! diff --git a/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp b/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp new file mode 100644 index 0000000000..2ddaad58c2 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp @@ -0,0 +1,6 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +py=$PYTHON_EXAMPLES/pubsub +background "Listening" ./topic_listener +clients $py/topic_publisher.py +outputs $py/topic_publisher.py.out "topic_listener.out | remove_uuid | sort" + diff --git a/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp.in b/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp.in new file mode 100644 index 0000000000..52e8db9d72 --- /dev/null +++ b/RC9/qpid/cpp/examples/pub-sub/verify_python_cpp.in @@ -0,0 +1,59 @@ +==== topic_publisher.py.out +==== topic_listener.out | remove_uuid | sort +Declaring queue: europe +Declaring queue: news +Declaring queue: usa +Declaring queue: weather +Listening for messages ... +Message: europe.news 0 from europe +Message: europe.news 0 from news +Message: europe.news 1 from europe +Message: europe.news 1 from news +Message: europe.news 2 from europe +Message: europe.news 2 from news +Message: europe.news 3 from europe +Message: europe.news 3 from news +Message: europe.news 4 from europe +Message: europe.news 4 from news +Message: europe.weather 0 from europe +Message: europe.weather 0 from weather +Message: europe.weather 1 from europe +Message: europe.weather 1 from weather +Message: europe.weather 2 from europe +Message: europe.weather 2 from weather +Message: europe.weather 3 from europe +Message: europe.weather 3 from weather +Message: europe.weather 4 from europe +Message: europe.weather 4 from weather +Message: That's all, folks! from europe +Message: That's all, folks! from news +Message: That's all, folks! from usa +Message: That's all, folks! from weather +Message: usa.news 0 from news +Message: usa.news 0 from usa +Message: usa.news 1 from news +Message: usa.news 1 from usa +Message: usa.news 2 from news +Message: usa.news 2 from usa +Message: usa.news 3 from news +Message: usa.news 3 from usa +Message: usa.news 4 from news +Message: usa.news 4 from usa +Message: usa.weather 0 from usa +Message: usa.weather 0 from weather +Message: usa.weather 1 from usa +Message: usa.weather 1 from weather +Message: usa.weather 2 from usa +Message: usa.weather 2 from weather +Message: usa.weather 3 from usa +Message: usa.weather 3 from weather +Message: usa.weather 4 from usa +Message: usa.weather 4 from weather +Shutting down listener for europe +Shutting down listener for news +Shutting down listener for usa +Shutting down listener for weather +Subscribing to queue europe +Subscribing to queue news +Subscribing to queue usa +Subscribing to queue weather diff --git a/RC9/qpid/cpp/examples/qmf-agent/Makefile b/RC9/qpid/cpp/examples/qmf-agent/Makefile new file mode 100644 index 0000000000..4c5daa6888 --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-agent/Makefile @@ -0,0 +1,85 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +SRC_DIR = . +QPID_DIR = ../../.. +SCHEMA_FILE = $(SRC_DIR)/schema.xml +GEN_DIR = $(SRC_DIR)/gen +OUT_FILE = $(SRC_DIR)/qmf-agent + +CC = gcc +LIB_DIR = $(QPID_DIR)/cpp/src/.libs +CC_INCLUDES = -I$(SRC_DIR) -I$(QPID_DIR)/cpp/src -I$(QPID_DIR)/cpp/src/gen -I$(GEN_DIR) +CC_FLAGS = -g -O3 +LD_FLAGS = -lqmfagent -L$(LIB_DIR) +SPEC_DIR = $(QPID_DIR)/specs +MGEN_DIR = $(QPID_DIR)/cpp/managementgen +MGEN = $(MGEN_DIR)/qmf-gen + +vpath %.cpp $(SRC_DIR):$(GEN_DIR) +vpath %.d $(OBJ_DIR) +vpath %.o $(OBJ_DIR) + +cpps = $(wildcard $(SRC_DIR)/*.cpp) +cpps += $(wildcard $(GEN_DIR)/qmf/org/apache/qpid/agent/example/*.cpp) +deps = $(addsuffix .d, $(basename $(cpps))) +objects = $(addsuffix .o, $(basename $(cpps))) + +.PHONY: all clean gen + +#========================================================== +# Pass 0: generate source files from schema +ifeq ($(MAKELEVEL), 0) + +all: gen + @$(MAKE) + +gen: + $(MGEN) -o $(GEN_DIR)/qmf $(SCHEMA_FILE) + +clean: + rm -rf $(GEN_DIR) $(OUT_FILE) *.d *.o + + +#========================================================== +# Pass 1: generate dependencies +else ifeq ($(MAKELEVEL), 1) + +all: $(deps) + @$(MAKE) + +%.d : %.cpp + $(CC) -M $(CC_FLAGS) $(CC_INCLUDES) $< > $@ + + +#========================================================== +# Pass 2: build project +else ifeq ($(MAKELEVEL), 2) + +$(OUT_FILE) : $(objects) + $(CC) -o $(OUT_FILE) $(CC_FLAGS) $(LD_FLAGS) $(objects) + +include $(deps) + +%.o : %.cpp + $(CC) -c $(CC_FLAGS) $(CC_INCLUDES) -o $@ $< + +endif + + diff --git a/RC9/qpid/cpp/examples/qmf-agent/example.cpp b/RC9/qpid/cpp/examples/qmf-agent/example.cpp new file mode 100644 index 0000000000..4dec014370 --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-agent/example.cpp @@ -0,0 +1,200 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include <qpid/management/Manageable.h> +#include <qpid/management/ManagementObject.h> +#include <qpid/agent/ManagementAgent.h> +#include <qpid/sys/Mutex.h> +#include "qmf/org/apache/qpid/agent/example/Parent.h" +#include "qmf/org/apache/qpid/agent/example/Child.h" +#include "qmf/org/apache/qpid/agent/example/ArgsParentCreate_child.h" +#include "qmf/org/apache/qpid/agent/example/EventChildCreated.h" +#include "qmf/org/apache/qpid/agent/example/Package.h" + +#include <signal.h> +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace std; +using qpid::management::ManagementAgent; +using qpid::management::ManagementObject; +using qpid::management::Manageable; +using qpid::management::Args; +using qpid::sys::Mutex; +namespace _qmf = qmf::org::apache::qpid::agent::example; + +class ChildClass; + +//============================================================== +// CoreClass is the operational class that corresponds to the +// "Parent" class in the management schema. +//============================================================== +class CoreClass : public Manageable +{ + string name; + ManagementAgent* agent; + _qmf::Parent* mgmtObject; + std::vector<ChildClass*> children; + Mutex vectorLock; + +public: + + CoreClass(ManagementAgent* agent, string _name); + ~CoreClass() { mgmtObject->resourceDestroy(); } + + ManagementObject* GetManagementObject(void) const + { return mgmtObject; } + + void doLoop(); + status_t ManagementMethod (uint32_t methodId, Args& args, string& text); +}; + +class ChildClass : public Manageable +{ + string name; + _qmf::Child* mgmtObject; + +public: + + ChildClass(ManagementAgent* agent, CoreClass* parent, string name); + ~ChildClass() { mgmtObject->resourceDestroy(); } + + ManagementObject* GetManagementObject(void) const + { return mgmtObject; } + + void doWork() + { + mgmtObject->inc_count(2); + } +}; + +CoreClass::CoreClass(ManagementAgent* _agent, string _name) : name(_name), agent(_agent) +{ + static uint64_t persistId = 0x111222333444555LL; + mgmtObject = new _qmf::Parent(agent, this, name); + + agent->addObject(mgmtObject, persistId++); + mgmtObject->set_state("IDLE"); +} + +void CoreClass::doLoop() +{ + // Periodically bump a counter to provide a changing statistical value + while (1) { + sleep(1); + mgmtObject->inc_count(); + mgmtObject->set_state("IN_LOOP"); + + { + Mutex::ScopedLock _lock(vectorLock); + + for (std::vector<ChildClass*>::iterator iter = children.begin(); + iter != children.end(); + iter++) { + (*iter)->doWork(); + } + } + } +} + +Manageable::status_t CoreClass::ManagementMethod(uint32_t methodId, Args& args, string& /*text*/) +{ + Mutex::ScopedLock _lock(vectorLock); + + switch (methodId) { + case _qmf::Parent::METHOD_CREATE_CHILD: + _qmf::ArgsParentCreate_child& ioArgs = (_qmf::ArgsParentCreate_child&) args; + + ChildClass *child = new ChildClass(agent, this, ioArgs.i_name); + ioArgs.o_childRef = child->GetManagementObject()->getObjectId(); + + children.push_back(child); + + agent->raiseEvent(_qmf::EventChildCreated(ioArgs.i_name)); + + return STATUS_OK; + } + + return STATUS_NOT_IMPLEMENTED; +} + +ChildClass::ChildClass(ManagementAgent* agent, CoreClass* parent, string name) +{ + mgmtObject = new _qmf::Child(agent, this, parent, name); + + agent->addObject(mgmtObject); +} + + +//============================================================== +// Main program +//============================================================== + +ManagementAgent::Singleton* singleton; + +void shutdown(int) +{ + delete singleton; + exit(0); +} + +int main_int(int argc, char** argv) +{ + singleton = new ManagementAgent::Singleton(); + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + qpid::client::ConnectionSettings settings; + + settings.host = host; + settings.port = port; + + signal(SIGINT, shutdown); + + // Create the qmf management agent + ManagementAgent* agent = singleton->getInstance(); + + // Register the Qmf_example schema with the agent + _qmf::Package packageInit(agent); + + // Start the agent. It will attempt to make a connection to the + // management broker + agent->init(settings, 5, false, ".magentdata"); + + // Allocate some core objects + CoreClass core1(agent, "Example Core Object #1"); + CoreClass core2(agent, "Example Core Object #2"); + CoreClass core3(agent, "Example Core Object #3"); + + core1.doLoop(); +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/RC9/qpid/cpp/examples/qmf-agent/schema.xml b/RC9/qpid/cpp/examples/qmf-agent/schema.xml new file mode 100644 index 0000000000..1bf701a655 --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-agent/schema.xml @@ -0,0 +1,64 @@ +<schema package="org.apache.qpid.agent.example"> + +<!-- + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. +--> + + <!-- + =============================================================== + Parent + =============================================================== + --> + <class name="Parent"> + + This class represents a parent object + + <property name="name" type="sstr" access="RC" index="y"/> + + <statistic name="state" type="sstr" desc="Operational state of the link"/> + <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> + + <method name="create_child" desc="Create child object"> + <arg name="name" dir="I" type="sstr"/> + <arg name="childRef" dir="O" type="objId"/> + </method> + </class> + + + <!-- + =============================================================== + Child + =============================================================== + --> + <class name="Child"> + <property name="ParentRef" type="objId" references="Parent" access="RC" index="y" parentRef="y"/> + <property name="name" type="sstr" access="RC" index="y"/> + + <statistic name="count" type="count64" unit="tick" desc="Counter that increases monotonically"/> + + <method name="delete"/> + </class> + + <eventArguments> + <arg name="childName" type="sstr"/> + </eventArguments> + + <event name="ChildCreated" args="childName"/> + <event name="ChildDestroyed" args="childName"/> +</schema> + diff --git a/RC9/qpid/cpp/examples/qmf-console/Makefile.am b/RC9/qpid/cpp/examples/qmf-console/Makefile.am new file mode 100644 index 0000000000..e115fc0fb0 --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-console/Makefile.am @@ -0,0 +1,41 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +examplesdir=$(pkgdatadir)/examples/qmf-console + +MAKELDFLAG = qmfconsole +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=console printevents ping + +console_SOURCES=console.cpp +console_LDADD=$(CONSOLE_LIB) + +printevents_SOURCES=printevents.cpp +printevents_LDADD=$(CONSOLE_LIB) + +ping_SOURCES=ping.cpp +ping_LDADD=$(CONSOLE_LIB) + +examples_DATA= \ + console.cpp \ + printevents.cpp \ + ping.cpp \ + $(MAKEDIST) + diff --git a/RC9/qpid/cpp/examples/qmf-console/console.cpp b/RC9/qpid/cpp/examples/qmf-console/console.cpp new file mode 100644 index 0000000000..5700d5556f --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-console/console.cpp @@ -0,0 +1,150 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/ConsoleListener.h" +#include "qpid/console/SessionManager.h" + +using namespace std; +using namespace qpid::console; + +class Listener : public ConsoleListener { +public: + ~Listener() {} + + void brokerConnected(const Broker& broker) { + cout << "brokerConnected: " << broker << endl; + } + + void brokerDisconnected(const Broker& broker) { + cout << "brokerDisconnected: " << broker << endl; + } + + void newPackage(const std::string& name) { + cout << "newPackage: " << name << endl; + } + + void newClass(const ClassKey& classKey) { + cout << "newClass: key=" << classKey << endl; + } + + void newAgent(const Agent& agent) { + cout << "newAgent: " << agent << endl; + } + + void delAgent(const Agent& agent) { + cout << "delAgent: " << agent << endl; + } + + void objectProps(Broker& broker, Object& object) { + cout << "objectProps: broker=" << broker << " object=" << object << endl; + } + + void objectStats(Broker& broker, Object& object) { + cout << "objectStats: broker=" << broker << " object=" << object << endl; + } + + void event(Event& event) { + cout << "event: " << event << endl; + } +}; + + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + //Listener listener; + qpid::client::ConnectionSettings settings; + + cout << "Creating SessionManager" << endl; + SessionManager sm; + cout << "Adding broker" << endl; + Broker* broker; + + broker = sm.addBroker(settings); + + cout << "Package List:" << endl; + vector<string> packages; + sm.getPackages(packages); + for (vector<string>::iterator iter = packages.begin(); iter != packages.end(); iter++) { + cout << " " << *iter << endl; + SessionManager::KeyVector classKeys; + sm.getClasses(classKeys, *iter); + for (SessionManager::KeyVector::iterator cIter = classKeys.begin(); + cIter != classKeys.end(); cIter++) + cout << " " << *cIter << endl; + } + + Object::Vector list; + cout << "getting exchanges..." << endl; + sm.getObjects(list, "exchange"); + cout << " returned " << list.size() << " elements" << endl; + + for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) { + cout << "exchange: " << *i << endl; + } + + list.clear(); + cout << "getting queues..." << endl; + sm.getObjects(list, "queue"); + cout << " returned " << list.size() << " elements" << endl; + + for (Object::Vector::iterator i = list.begin(); i != list.end(); i++) { + cout << "queue: " << *i << endl; + cout << " bindingCount=" << i->attrUint("bindingCount") << endl; + cout << " arguments=" << i->attrMap("arguments") << endl; + } + + list.clear(); + sm.getObjects(list, "broker"); + if (list.size() == 1) { + Object& broker = *list.begin(); + + cout << "Broker: " << broker << endl; + + Object::AttributeMap args; + MethodResponse result; + args.addUint("sequence", 1); + args.addString("body", "Testing..."); + + cout << "Call echo method..." << endl; + broker.invokeMethod("echo", args, result); + cout << "Result: code=" << result.code << " text=" << result.text << endl; + for (Object::AttributeMap::iterator aIter = result.arguments.begin(); + aIter != result.arguments.end(); aIter++) { + cout << " Output Arg: " << aIter->first << " => " << aIter->second->str() << endl; + } + } + + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/RC9/qpid/cpp/examples/qmf-console/ping.cpp b/RC9/qpid/cpp/examples/qmf-console/ping.cpp new file mode 100644 index 0000000000..debca7428a --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-console/ping.cpp @@ -0,0 +1,129 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/SessionManager.h" + +using namespace std; +using namespace qpid::console; + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + // + // Declare connection settings for the messaging broker. The settings default to + // localhost:5672 with user guest (password guest). Refer to the header file + // <qpid/client/ConnectionSettings.h> for full details. + // + qpid::client::ConnectionSettings connSettings; + + // + // Declare the (optional) session manager settings. Override the default timeout + // for methods calls to be 2 seconds. + // + SessionManager::Settings smSettings; + smSettings.methodTimeout = 2; + + // + // Declare the console session manager. With a null listener argument, it defaults to + // synchronous-only access mode. + // + SessionManager sm(0, smSettings); + + // + // Add a broker connection to the session manager. + // + Broker* broker = sm.addBroker(connSettings); + + uint32_t count = 5; // The number of echo requests we will send to the broker. + Object::Vector list; // A container for holding objects retrieved from the broker. + + // + // Query for a list of 'broker' objects from the Management Database + // + sm.getObjects(list, "broker"); + + // + // We expect one object (since we are connected to only one broker) + // + if (list.size() == 1) { + Object& brokerObject = *(list.begin()); + + for (uint32_t iter = 0; iter < count; iter++) { + // + // Declare a container for arguments to be sent with the "echo" method + // that we will invoke on the remote "broker" object. + // + Object::AttributeMap args; + + // + // Declare a container for the result of the method invocation. + // + MethodResponse result; + + // + // Set the values of the input arguments. + // + args.addUint("sequence", iter); + args.addString("body", "ABCDEFGHIJKLMNOPQRSTUVWXYZ"); + + cout << "Ping Broker: " << broker->getUrl() << "... "; + cout.flush(); + + // + // Invoke the method. This is a synchronous operation that will block until + // the method completes and returns a result. + // + brokerObject.invokeMethod("echo", args, result); + + // + // result.code is the return code (0 => Success) + // result.text is the return text + // result.arguments is a container (of type Object::AttributeMap) that holds + // the output arguments (if any) from the method. + // + cout << "Result: code=" << result.code << " text=" << result.text; + if (result.code == 0) + cout << " seq=" << result.arguments["sequence"]->asUint(); + cout << endl; + + if (result.code == 0 && iter < count - 1) + ::sleep(1); + } + } + + // + // Disconnect the broker from the session manager. + // + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/RC9/qpid/cpp/examples/qmf-console/printevents.cpp b/RC9/qpid/cpp/examples/qmf-console/printevents.cpp new file mode 100644 index 0000000000..bbec2c1af0 --- /dev/null +++ b/RC9/qpid/cpp/examples/qmf-console/printevents.cpp @@ -0,0 +1,105 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "qpid/console/ConsoleListener.h" +#include "qpid/console/SessionManager.h" +#include "qpid/sys/Time.h" + +using namespace std; +using namespace qpid::console; + +// +// Define a listener class to receive asynchronous events. +// +class Listener : public ConsoleListener { +public: + void brokerConnected(const Broker& broker) { + cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerConnected broker=" << + broker.getUrl() << endl; + } + + void brokerDisconnected(const Broker& broker) { + cout << qpid::sys::now() << " NOTIC qpid-printevents:brokerDisonnected broker=" << + broker.getUrl() << endl; + } + + void event(Event& event) { + cout << event << endl; + } +}; + + +//============================================================== +// Main program +//============================================================== +int main_int(int /*argc*/, char** /*argv*/) +{ + // + // Declare an instance of our listener. + // + Listener listener; + + // + // Declare connection settings for the messaging broker. The settings default to + // localhost:5672 with user guest (password guest). Refer to the header file + // <qpid/client/ConnectionSettings.h> for full details. + // + qpid::client::ConnectionSettings connSettings; + + // + // Declare the (optional) session manager settings. Disable the reception of + // object updates and heartbeats. Note that by disabling the reception of things + // we don't need, we don't unnecessarily use network bandwidth. + // + SessionManager::Settings smSettings; + smSettings.rcvObjects = false; + smSettings.rcvHeartbeats = false; + + // + // Declare the console session manager. + // + SessionManager sm(&listener, smSettings); + + // + // Add a broker connection to the session manager. If desired, multiple brokers may + // be connected. + // + Broker* broker = sm.addBroker(connSettings); + + // + // Sleep indefinitely while asynchronous events are handled by the listener. + // + for (;;) + ::sleep(1); + + sm.delBroker(broker); + return 0; +} + +int main(int argc, char** argv) +{ + try { + return main_int(argc, argv); + } catch(std::exception& e) { + cout << "Top Level Exception: " << e.what() << endl; + } +} + diff --git a/RC9/qpid/cpp/examples/request-response/Makefile.am b/RC9/qpid/cpp/examples/request-response/Makefile.am new file mode 100644 index 0000000000..8fb95f24fc --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/Makefile.am @@ -0,0 +1,48 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/request-response + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=client server + +client_SOURCES=client.cpp +client_LDADD=$(CLIENT_LIB) + +server_SOURCES=server.cpp +server_LDADD=$(CLIENT_LIB) + +examples_DATA= \ + server.cpp \ + client.cpp \ + $(MAKEDIST) + +EXTRA_DIST= \ + $(examples_DATA) \ + verify \ + verify.in \ + verify_cpp_python \ + verify_cpp_python.in \ + verify_python_cpp \ + verify_python_cpp.in + + + + + diff --git a/RC9/qpid/cpp/examples/request-response/client.cpp b/RC9/qpid/cpp/examples/request-response/client.cpp new file mode 100644 index 0000000000..b1f8d0b587 --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/client.cpp @@ -0,0 +1,164 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * client.cpp + * + * This program is one of two programs that illustrate the + * request/response pattern. + * + * + * client.cpp (this program) + * + * A client application that sends messages to the "amq.direct" + * exchange, using the routing key "request" to route messages to + * the server. + * + * Each instance of the client creates its own private response + * queue, binding it to the "amq.direct" exchange using it's + * session identifier as the routing key, and places its session + * identifier in the "reply-to" property of each message it sends. + * + * + * server.cpp + * + * A service that accepts messages from a request queue, converts + * their content to upper case, and sends the result to the + * original sender. + * + * This program creates a request queue, binds it to "amq.direct" + * using the routing key "request", then receives messages from + * the request queue. Each incoming message is converted to upper + * case, then sent to the "amq.direct" exchange using the + * request's reply-to property as the routing key for the + * response. + * + * + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + int counter; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs), counter(0) +{} + +void Listener::received(Message& message) { + std::cout << "Response: " << message.getData() << std::endl; + + ++ counter; + if (counter > 3) { + std::cout << "Shutting down listener for " << message.getDestination() << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Create a response queue so the server can send us responses + // to our requests. Use the client's session ID as the name + // of the response queue. + + stringstream response_queue; + response_queue << "client" << session.getId().getName(); + + // Use the name of the response queue as the routing key + + session.queueDeclare(arg::queue=response_queue.str()); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=response_queue.str(), arg::bindingKey=response_queue.str()); + + // Each client sends the name of their own response queue so + // the service knows where to route messages. + + Message request; + request.getDeliveryProperties().setRoutingKey("request"); + request.getMessageProperties().setReplyTo(ReplyTo("amq.direct", response_queue.str())); + + // Create a listener for the response queue and listen for response messages. + std::cout << "Activating response queue listener for: " << response_queue.str() << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions); + subscriptions.subscribe(listener, response_queue.str()); + + // Now send some requests ... + + string s[] = { + "Twas brillig, and the slithy toves", + "Did gire and gymble in the wabe.", + "All mimsy were the borogroves,", + "And the mome raths outgrabe." + }; + + + for (int i=0; i<4; i++) { + request.setData(s[i]); + session.messageTransfer(arg::content=request, arg::destination="amq.direct"); + std::cout << "Request: " << s[i] << std::endl; + } + + std::cout << "Waiting for all responses to arrive ..." << std::endl; + subscriptions.run(); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/request-response/server.cpp b/RC9/qpid/cpp/examples/request-response/server.cpp new file mode 100644 index 0000000000..2d62638dff --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/server.cpp @@ -0,0 +1,162 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * server.cpp + * + * This program is one of two programs that illustrate the + * request/response pattern. + * + * + * client.cpp + * + * A client application that sends messages to the "amq.direct" + * exchange, using the routing key "request" to route messages to + * the server. + * + * Each instance of the client creates its own private response + * queue, binding it to the "amq.direct" exchange using it's + * session identifier as the routing key, and places its session + * identifier in the "reply-to" property of each message it sends. + * + * + * server.cpp (this program) + * + * A service that accepts messages from a request queue, converts + * their content to upper case, and sends the result to the + * original sender. + * + * This program creates a request queue, binds it to "amq.direct" + * using the routing key "request", then receives messages from + * the request queue. Each incoming message is converted to upper + * case, then sent to the "amq.direct" exchange using the + * request's reply-to property as the routing key for the + * response. + * + * + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> +#include <qpid/client/MessageListener.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> +#include <string> + +using namespace qpid::client; +using namespace qpid::framing; +using std::stringstream; +using std::string; + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + AsyncSession asyncSession; + public: + Listener(SubscriptionManager& subscriptions, Session& session); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs, Session& session) + : subscriptions(subs), asyncSession(session) +{} + +void Listener::received(Message& request) { + Message response; + + // Get routing key for response from the request's replyTo property + string routingKey; + + if (request.getMessageProperties().hasReplyTo()) { + routingKey = request.getMessageProperties().getReplyTo().getRoutingKey(); + } else { + std::cout << "Error: " << "No routing key for request (" << request.getData() << ")" << std::endl; + return; + } + + std::cout << "Request: " << request.getData() << " (" <<routingKey << ")" << std::endl; + + // Transform message content to upper case + std::string s = request.getData(); + std::transform (s.begin(), s.end(), s.begin(), toupper); + response.setData(s); + + // Send it back to the user + response.getDeliveryProperties().setRoutingKey(routingKey); + + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + asyncSession.messageTransfer(arg::content=response, arg::destination="amq.direct"); +} + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + + // Create a request queue for clients to use when making + // requests. + string request_queue = "request"; + + // Use the name of the request queue as the routing key + session.queueDeclare(arg::queue=request_queue); + session.exchangeBind(arg::exchange="amq.direct", arg::queue=request_queue, arg::bindingKey=request_queue); + + // Create a listener and subscribe it to the request_queue + std::cout << "Activating request queue listener for: " << request_queue << std::endl; + SubscriptionManager subscriptions(session); + Listener listener(subscriptions, session); + subscriptions.subscribe(listener, request_queue); + // Deliver messages until the subscription is cancelled + // by Listener::received() + + std::cout << "Waiting for requests" << std::endl; + subscriptions.run(); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/request-response/verify b/RC9/qpid/cpp/examples/request-response/verify new file mode 100644 index 0000000000..dee82413e7 --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify @@ -0,0 +1,24 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Waiting" ./server +clients ./client +kill %% # Must kill the server. +outputs "./client.out | remove_uuid" "server.out | remove_uuid" diff --git a/RC9/qpid/cpp/examples/request-response/verify.in b/RC9/qpid/cpp/examples/request-response/verify.in new file mode 100644 index 0000000000..7925dc5671 --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify.in @@ -0,0 +1,19 @@ +==== client.out | remove_uuid +Activating response queue listener for: client +Request: Twas brillig, and the slithy toves +Request: Did gire and gymble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Waiting for all responses to arrive ... +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GIRE AND GYMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +Shutting down listener for client +==== server.out | remove_uuid +Activating request queue listener for: request +Waiting for requests +Request: Twas brillig, and the slithy toves (client) +Request: Did gire and gymble in the wabe. (client) +Request: All mimsy were the borogroves, (client) +Request: And the mome raths outgrabe. (client) diff --git a/RC9/qpid/cpp/examples/request-response/verify_cpp_python b/RC9/qpid/cpp/examples/request-response/verify_cpp_python new file mode 100644 index 0000000000..867af3a92b --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify_cpp_python @@ -0,0 +1,6 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Request server running" $PYTHON_EXAMPLES/request-response/server.py +clients ./client +sleep 1 +kill %% # Must kill the server. +outputs "./client.out | remove_uuid" "$PYTHON_EXAMPLES/request-response/server.py.out | remove_uuid" diff --git a/RC9/qpid/cpp/examples/request-response/verify_cpp_python.in b/RC9/qpid/cpp/examples/request-response/verify_cpp_python.in new file mode 100644 index 0000000000..a032293d9b --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify_cpp_python.in @@ -0,0 +1,15 @@ +==== client.out | remove_uuid +Activating response queue listener for: client +Request: Twas brillig, and the slithy toves +Request: Did gire and gymble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Waiting for all responses to arrive ... +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GIRE AND GYMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +Shutting down listener for client +==== server.py.out | remove_uuid +Request server running - run your client now. +(Times out after 100 seconds ...) diff --git a/RC9/qpid/cpp/examples/request-response/verify_python_cpp b/RC9/qpid/cpp/examples/request-response/verify_python_cpp new file mode 100644 index 0000000000..d6f0fa7152 --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify_python_cpp @@ -0,0 +1,5 @@ +# See https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid/bin/verify +background "Waiting" ./server +clients $PYTHON_EXAMPLES/request-response/client.py +kill %% # Must kill the server. +outputs "$PYTHON_EXAMPLES/request-response/client.py.out | remove_uuid" "server.out | remove_uuid" diff --git a/RC9/qpid/cpp/examples/request-response/verify_python_cpp.in b/RC9/qpid/cpp/examples/request-response/verify_python_cpp.in new file mode 100644 index 0000000000..1500134619 --- /dev/null +++ b/RC9/qpid/cpp/examples/request-response/verify_python_cpp.in @@ -0,0 +1,18 @@ +==== client.py.out | remove_uuid +Request: Twas brillig, and the slithy toves +Request: Did gyre and gimble in the wabe. +Request: All mimsy were the borogroves, +Request: And the mome raths outgrabe. +Messages on queue: reply_to: +Response: TWAS BRILLIG, AND THE SLITHY TOVES +Response: DID GYRE AND GIMBLE IN THE WABE. +Response: ALL MIMSY WERE THE BOROGROVES, +Response: AND THE MOME RATHS OUTGRABE. +No more messages! +==== server.out | remove_uuid +Activating request queue listener for: request +Waiting for requests +Request: Twas brillig, and the slithy toves (reply_to:) +Request: Did gyre and gimble in the wabe. (reply_to:) +Request: All mimsy were the borogroves, (reply_to:) +Request: And the mome raths outgrabe. (reply_to:) diff --git a/RC9/qpid/cpp/examples/verify b/RC9/qpid/cpp/examples/verify new file mode 100755 index 0000000000..08dcf327a7 --- /dev/null +++ b/RC9/qpid/cpp/examples/verify @@ -0,0 +1,107 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + + +# Driver script to verify installed examples (also used for build tests.) +# +# Usage: verify example_dir [ example_dir ...] +# Where each example_dir must contain a verify sub-script to include. +# +# If $QPIDD is set, run a private QPIDD and use it. +# If $QPID_HOST or $QPID_PORT are set, use them to connect. +# + +QPID_DATA_DIR= +export QPID_DATA_DIR + +cleanup() { + test -n "$QPIDD" && $QPIDD -q # Private broker + kill %% > /dev/null 2>&1 # Leftover background jobs +} + +trap cleanup EXIT + +ARGS="${QPID_HOST:-localhost} $QPID_PORT" + +outfile() { + file=$1 + while [ -f $file.out ]; do file="${file}X"; done + echo $file.out + } + +fail() { test -n "$*" && echo $* 1>&2 ; FAIL=1; return 1; } + +client() { "$@" $ARGS > `outfile $*` || fail; } + +clients() { for cmd in "$@"; do client $cmd; done; } + +waitfor() { until grep -a -l "$2" $1 >/dev/null 2>&1 ; do sleep 1 ; done ; } + +background() { + pattern=$1; shift + out=`outfile $*` + eval "$* $ARGS > $out &" || { fail; return 1; } + waitfor $out "$pattern" +} + +name() { + for x in $*; do name="$name `basename $x`"; done + echo $name; +} + +outputs() { + wait 2> /dev/null # Wait for all backgroud processes to complete + rm -f $script.out + for f in "$@"; do + { echo "==== `name $f`"; eval "cat $f"; } >> $script.out || fail + done +} + +verify() { + FAIL= + if [ -d $1 ]; then dir=$1; script=verify; + else dir=`dirname $1`; script=`basename $1`; fi + cd $dir || return 1 + rm -f *.out + { source ./$script && diff -ac $script.out $script.in ; } || fail + test -z "$FAIL" && rm -f *.out + return $FAIL +} + +HEX="[a-fA-F0-9]" +remove_uuid() { + sed "s/$HEX\{8\}-$HEX\{4\}-$HEX\{4\}-$HEX\{4\}-$HEX\{12\}//g" $* +} +remove_uuid64() { + sed 's/[-A-Za-z0-9_]\{22\}==//g' $* +} + +# Start private broker if QPIDD is set. +if [ -n "$QPIDD" ] ; then + export QPID_PORT=`$QPIDD -dp0` || { echo "Cannot start $QPIDD" ; exit 1; } + trap "$QPIDD -q" EXIT +fi + +for example in "$@"; do + echo "== $example " + if ( verify $example; ) then echo "PASS"; else echo "FAIL"; RET=1; fi + done +exit $RET diff --git a/RC9/qpid/cpp/examples/verify_all b/RC9/qpid/cpp/examples/verify_all new file mode 100755 index 0000000000..d2bc9b67d1 --- /dev/null +++ b/RC9/qpid/cpp/examples/verify_all @@ -0,0 +1,53 @@ +#!/bin/sh + +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +# Verify all C++/python example combinations. +# + +verify=`dirname $0`/verify +topsrcdir=$1 +qpidd=$2 +broker_args=$3 +exclude_regexp=$4 +python=${QPID_PYTHON_DIR:-$topsrcdir/python} + +trap "$qpidd -q" exit +QPID_PORT=`$qpidd -dp0 $broker_args` || { echo "Can't run qpidd" ; exit 1; } +PYTHON_EXAMPLES=$python/examples +PYTHONPATH=$python:$PYTHONPATH +export QPID_PORT PYTHON_EXAMPLES PYTHONPATH + +test -d $PYTHON_EXAMPLES || echo "WARNING: No python examples. $PYTHON_EXAMPLES not found." +find="find" +test -d $PYTHON_EXAMPLES && find="$find $PYTHON_EXAMPLES" +find="$find -mindepth 2 -name verify" +test -d $PYTHON_EXAMPLES && \ + find="$find -o -name verify_cpp_python -o -name verify_python_cpp" +all_examples=`$find` + +if test -z "$exclude_regexp"; then + run_examples=$all_examples +else + for f in $all_examples; do + { cat $f | grep $exclude_regexp > /dev/null ; } || run_examples="$run_examples $f" + done +fi +$verify $run_examples diff --git a/RC9/qpid/cpp/examples/xml-exchange/Makefile.am b/RC9/qpid/cpp/examples/xml-exchange/Makefile.am new file mode 100644 index 0000000000..0f99aea44e --- /dev/null +++ b/RC9/qpid/cpp/examples/xml-exchange/Makefile.am @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# +examplesdir=$(pkgdatadir)/examples/xml-exchange + +include $(top_srcdir)/examples/makedist.mk + +noinst_PROGRAMS=declare_queues xml_producer listener + +declare_queues_SOURCES=declare_queues.cpp +declare_queues_LDADD=$(CLIENT_LIB) + +xml_producer_SOURCES=xml_producer.cpp +xml_producer_LDADD=$(CLIENT_LIB) + +listener_SOURCES=listener.cpp +listener_LDADD=$(CLIENT_LIB) + +EXTRA_DIST= \ + README + +examples_DATA= \ + $(EXTRA_DIST) \ + declare_queues.cpp \ + listener.cpp \ + xml_producer.cpp \ + $(MAKEDIST) + + + + + diff --git a/RC9/qpid/cpp/examples/xml-exchange/README b/RC9/qpid/cpp/examples/xml-exchange/README new file mode 100644 index 0000000000..85caebe352 --- /dev/null +++ b/RC9/qpid/cpp/examples/xml-exchange/README @@ -0,0 +1,53 @@ +This example shows how to program a simple application +using the XML Exchange. + +[Note: The XML Exchange is not a standard AMQP exchange type. To run +this example you need to have a broker that has support for the xml +exchange. If you are compiling the broker from source please refer to +the INSTALL notes from qpid.] + +To run the example, execute the programs in the +following order: + +1 ./declare_queues +2 ./listener +3 ./message_producer (in a separate window) + +The XML Exchange must be explicitly declared. Bindings +are established using queries in XQuery. These queries +can reference message content, message application +properties (which are declared as external variables +in the XQuery), or both. + +Once this is done, message producers publish to the +exchange using the exchange name and a routing key, +just as for other exchange types. Message consumers +read from the queues to which messages are routed. +If a message does not have XML content, or is +missing message application properties needed by +the query, the query is not routed. + +Queries can use message application headers to +provide functionality similar to JMS selectors. +If a query does not use the content of a message, +the message content is not parsed, and need not +be XML. + +The XQuery processor, XQilla, does path-based +document projection, so once the portion of +a document needed to evaluate a query has +been read, it stops parsing the document. +Suppose a long document has a header section. +You can indicate in the query that only +one header section needs to be queried, +and there is no need to parse the entire +document to see if there are further header +sections, using a path like this: + +./message/header[1]/date + +If you used a path like this, all children +of the message element would be read to +see if there are further headers: + +./message/header/date diff --git a/RC9/qpid/cpp/examples/xml-exchange/declare_queues.cpp b/RC9/qpid/cpp/examples/xml-exchange/declare_queues.cpp new file mode 100644 index 0000000000..d3a0f539b6 --- /dev/null +++ b/RC9/qpid/cpp/examples/xml-exchange/declare_queues.cpp @@ -0,0 +1,99 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * + * declare_queues.cpp + * + * This is one of three programs used to implement XML-based content + * routing in C++. + * + * declare_queues.cpp (this program) + * + * Creates a queue named "message_qaueue" on the broker, + * declares an XML Exchange, subscribes the queue to the XML + * Exchange using an XQuery in the binding, then exits. + * + * xml_producer.cpp + * + * Publishes messages to the XML Exchange. + * + * listener.cpp + * + * Reads messages from the "message_queue" queue. + */ + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::string; + + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + + try { + connection.open(host, port); + Session session = connection.newSession(); + + + //--------- Main body of program -------------------------------------------- + + // Set up queues, bind them with queries. Note that the XML exchange + // is not in the AMQP specification, so it is called "xml", not "amq.xml". + // Note that the XML exchange is not predeclared in Qpid, it must + // be declared by the application. + + session.queueDeclare(arg::queue="message_queue"); + session.exchangeDeclare(arg::exchange="xml", arg::type="xml"); + + // Application message properties are mapped to external variables + // in the XQuery. An XML Exchange can query message properties much + // like JMS, query the XML content of the message, or both. + + FieldTable binding; + binding.setString("xquery", "declare variable $control external;" + "./message/id mod 2 = 1 or $control = 'end'"); + session.exchangeBind(arg::exchange="xml", arg::queue="message_queue", arg::bindingKey="content_feed", arg::arguments=binding); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; + +} + + + diff --git a/RC9/qpid/cpp/examples/xml-exchange/listener.cpp b/RC9/qpid/cpp/examples/xml-exchange/listener.cpp new file mode 100644 index 0000000000..02b722c745 --- /dev/null +++ b/RC9/qpid/cpp/examples/xml-exchange/listener.cpp @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * + * listener.cpp + * + * This is one of three programs used to implement XML-based content + * routing in C++. + * + * declare_queues.cpp + * + * Creates a queue named "message_qaueue" on the broker, + * declares an XML Exchange, subscribes the queue to the XML + * Exchange using an XQuery in the binding, then exits. + * + * xml_producer.cpp + * + * Publishes messages to the XML Exchange. + * + * listener.cpp (this program) + * + * Reads messages from the "message_queue" queue. + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/Message.h> +#include <qpid/client/SubscriptionManager.h> + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +using namespace qpid::client; +using namespace qpid::framing; + + +class Listener : public MessageListener{ + private: + SubscriptionManager& subscriptions; + public: + Listener(SubscriptionManager& subscriptions); + virtual void received(Message& message); +}; + +Listener::Listener(SubscriptionManager& subs) : subscriptions(subs) +{} + +void Listener::received(Message& message) { + std::cout << "Message: " << message.getData() << std::endl; + if (message.getHeaders().getAsString("control") == "end") { + std::cout << "Shutting down listener for " << message.getDestination() + << std::endl; + subscriptions.cancel(message.getDestination()); + } +} + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + + Connection connection; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + SubscriptionManager subscriptions(session); + // Create a listener and subscribe it to the queue named "message_queue" + Listener listener(subscriptions); + subscriptions.subscribe(listener, "message_queue"); + // Receive messages until the subscription is cancelled + // by Listener::received() + subscriptions.run(); + + //--------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + diff --git a/RC9/qpid/cpp/examples/xml-exchange/xml_producer.cpp b/RC9/qpid/cpp/examples/xml-exchange/xml_producer.cpp new file mode 100644 index 0000000000..5cb75d0087 --- /dev/null +++ b/RC9/qpid/cpp/examples/xml-exchange/xml_producer.cpp @@ -0,0 +1,114 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + + +/** + * + * xml_producer.cpp + * + * This is one of three programs used to implement XML-based content + * routing in C++. + * + * declare_queues.cpp + * + * Creates a queue named "message_qaueue" on the broker, + * declares an XML Exchange, subscribes the queue to the XML + * Exchange using an XQuery in the binding, then exits. + * + * xml_producer.cpp (this program) + * + * Publishes messages to the XML Exchange. + * + * listener.cpp + * + * Reads messages from the "message_queue" queue. + */ + + +#include <qpid/client/Connection.h> +#include <qpid/client/Session.h> +#include <qpid/client/AsyncSession.h> +#include <qpid/client/Message.h> + + +#include <unistd.h> +#include <cstdlib> +#include <iostream> + +#include <sstream> + +using namespace qpid::client; +using namespace qpid::framing; + +using std::stringstream; +using std::string; + +int main(int argc, char** argv) { + const char* host = argc>1 ? argv[1] : "127.0.0.1"; + int port = argc>2 ? atoi(argv[2]) : 5672; + Connection connection; + Message message; + try { + connection.open(host, port); + Session session = connection.newSession(); + + //--------- Main body of program -------------------------------------------- + + // Publish some XML messages. Use the control property to + // indicate when we are finished. + // + // In the XML exchange, the routing key and the name of + // the query match. + + message.getDeliveryProperties().setRoutingKey("content_feed"); + message.getHeaders().setString("control","continue"); + + // Now send some messages ... + + for (int i=0; i<10; i++) { + stringstream message_data; + message_data << "<message><id>" << i << "</id></message>"; + + std::cout << "Message data: " << message_data.str() << std::endl; + + message.setData(message_data.str()); + // Asynchronous transfer sends messages as quickly as + // possible without waiting for confirmation. + async(session).messageTransfer(arg::content=message, arg::destination="xml"); + } + + // And send a final message to indicate termination. + + message.getHeaders().setString("control","end"); + message.setData("<end>That's all, folks!</end>"); + session.messageTransfer(arg::content=message, arg::destination="xml"); + + //----------------------------------------------------------------------------- + + connection.close(); + return 0; + } catch(const std::exception& error) { + std::cout << error.what() << std::endl; + } + return 1; +} + + |