diff options
author | Alan Conway <aconway@apache.org> | 2008-06-25 20:51:30 +0000 |
---|---|---|
committer | Alan Conway <aconway@apache.org> | 2008-06-25 20:51:30 +0000 |
commit | 4d560b89fa09056c22cd42e212c9ce8addeecb5a (patch) | |
tree | 3a15eb32ca85193ed5d2b97b5c6e7a6f053fb5f1 /cpp | |
parent | 830943be4ed6ae90edd2e2655720c0dcc721171d (diff) | |
download | qpid-python-4d560b89fa09056c22cd42e212c9ce8addeecb5a.tar.gz |
Additions to the client API:
- SubscriptionManager::get(queue) to get a single message from a queue.
- Set FlowControl per-subscription.
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@671655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r-- | cpp/src/Makefile.am | 1 | ||||
-rw-r--r-- | cpp/src/qpid/client/FlowControl.h | 73 | ||||
-rw-r--r-- | cpp/src/qpid/client/LocalQueue.h | 2 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.cpp | 42 | ||||
-rw-r--r-- | cpp/src/qpid/client/SubscriptionManager.h | 55 | ||||
-rw-r--r-- | cpp/src/tests/ClientSessionTest.cpp | 27 | ||||
-rwxr-xr-x | cpp/src/tests/ais_check | 12 | ||||
-rwxr-xr-x | cpp/src/tests/ais_run | 15 | ||||
-rw-r--r-- | cpp/src/tests/cluster.mk | 2 | ||||
-rw-r--r-- | cpp/src/tests/cluster_client.cpp | 28 | ||||
-rwxr-xr-x | cpp/src/tests/start_cluster | 8 |
11 files changed, 227 insertions, 38 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am index 12bca2396b..2496c5f8b0 100644 --- a/cpp/src/Makefile.am +++ b/cpp/src/Makefile.am @@ -444,6 +444,7 @@ nobase_include_HEADERS = \ qpid/client/Demux.h \ qpid/client/Dispatcher.h \ qpid/client/Execution.h \ + qpid/client/FlowControl.h \ qpid/client/Future.h \ qpid/client/FutureCompletion.h \ qpid/client/FutureResult.h \ diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h new file mode 100644 index 0000000000..a4ed9879f4 --- /dev/null +++ b/cpp/src/qpid/client/FlowControl.h @@ -0,0 +1,73 @@ +#ifndef QPID_CLIENT_FLOWCONTROL_H +#define QPID_CLIENT_FLOWCONTROL_H + +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +namespace qpid { +namespace client { + +/** + * Flow control works by associating a finite amount of "credit" + * associated with a subscription. + * + * Credit includes a message count and a byte count. Each message + * received decreases the message count by one, and the byte count by + * the size of the message. Either count can have the special value + * UNLIMITED which is never decreased. + * + * A subscription's credit is exhausted when the message count is 0 or + * the byte count is too small for the next available message. The + * subscription will not receive any further messages until is credit + * is renewed. + * + * In "window mode" credit is automatically renewed when a message is + * acknowledged (@see AckPolicy) In non-window mode credit is not + * automatically renewed, it must be explicitly re-set (@see + * SubscriptionManager) + */ +struct FlowControl { + static const uint32_t UNLIMITED=0xFFFFFFFF; + FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false) + : messages(messages_), bytes(bytes_), window(window_) {} + + static FlowControl messageCredit(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,false); } + static FlowControl messageWindow(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,true); } + static FlowControl byteCredit(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,false); } + static FlowControl byteWindow(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,true); } + static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, false); } + static FlowControl zero() { return FlowControl(0, 0, false); } + + /** Message credit: subscription can accept up to this many messages. */ + uint32_t messages; + /** Byte credit: subscription can accept up to this many bytes of message content. */ + uint32_t bytes; + /** Window mode. If true credit is automatically renewed as messages are acknowledged. */ + bool window; + + bool operator==(const FlowControl& x) { + return messages == x.messages && bytes == x.bytes && window == x.window; + }; +}; + +}} // namespace qpid::client + +#endif /*!QPID_CLIENT_FLOWCONTROL_H*/ diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h index c76d0756eb..273814f179 100644 --- a/cpp/src/qpid/client/LocalQueue.h +++ b/cpp/src/qpid/client/LocalQueue.h @@ -56,7 +56,7 @@ class LocalQueue */ Message pop(); - /** Synonym for get(). */ + /** Synonym for pop(). */ Message get() { return pop(); } /** Return true if local queue is empty. */ diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp index 3fa75a54ac..324b11e1df 100644 --- a/cpp/src/qpid/client/SubscriptionManager.cpp +++ b/cpp/src/qpid/client/SubscriptionManager.cpp @@ -25,6 +25,7 @@ #include <qpid/client/Dispatcher.h> #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> +#include <qpid/framing/Uuid.h> #include <set> #include <sstream> @@ -34,35 +35,48 @@ namespace client { SubscriptionManager::SubscriptionManager(const Session& s) : dispatcher(s), session(s), - messages(UNLIMITED), bytes(UNLIMITED), window(true), + flowControl(UNLIMITED, UNLIMITED, false), acceptMode(0), acquireMode(0), autoStop(true) {} void SubscriptionManager::subscribeInternal( - const std::string& q, const std::string& dest) + const std::string& q, const std::string& dest, const FlowControl& fc) { session.messageSubscribe( arg::queue=q, arg::destination=dest, arg::acceptMode=acceptMode, arg::acquireMode=acquireMode); - setFlowControl(dest, messages, bytes, window); + if (fc.messages || fc.bytes) // No need to set if all 0. + setFlowControl(dest, fc); } void SubscriptionManager::subscribe( MessageListener& listener, const std::string& q, const std::string& d) { + subscribe(listener, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; dispatcher.listen(dest, &listener, autoAck); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::subscribe( LocalQueue& lq, const std::string& q, const std::string& d) { + subscribe(lq, q, getFlowControl(), d); +} + +void SubscriptionManager::subscribe( + LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d) +{ std::string dest=d.empty() ? q:d; lq.session=session; lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest)); - return subscribeInternal(q, dest); + return subscribeInternal(q, dest, fc); } void SubscriptionManager::setFlowControl( @@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl( session.sync(); } +void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) { + setFlowControl(dest, fc.messages, fc.bytes, fc.window); +} + +void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; } + void SubscriptionManager::setFlowControl( uint32_t messages_, uint32_t bytes_, bool window_) { - messages=messages_; - bytes=bytes_; - window=window_; + setFlowControl(FlowControl(messages_, bytes_, window_)); } +const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; } + void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; } void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; } @@ -109,6 +129,12 @@ void SubscriptionManager::stop() dispatcher.stop(); } +Message SubscriptionManager::get(const std::string& queue) { + LocalQueue lq; + subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str()); + return lq.get(); +} + }} // namespace qpid::client #endif diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h index 930175564e..0aa55099f5 100644 --- a/cpp/src/qpid/client/SubscriptionManager.h +++ b/cpp/src/qpid/client/SubscriptionManager.h @@ -27,8 +27,8 @@ #include <qpid/client/Session.h> #include <qpid/client/MessageListener.h> #include <qpid/client/LocalQueue.h> +#include <qpid/client/FlowControl.h> #include <qpid/sys/Runnable.h> - #include <set> #include <sstream> @@ -48,13 +48,11 @@ class SubscriptionManager : public sys::Runnable typedef sys::Mutex::ScopedLock Lock; typedef sys::Mutex::ScopedUnlock Unlock; - void subscribeInternal(const std::string& q, const std::string& dest); + void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&); qpid::client::Dispatcher dispatcher; qpid::client::AsyncSession session; - uint32_t messages; - uint32_t bytes; - bool window; + FlowControl flowControl; AckPolicy autoAck; bool acceptMode; bool acquireMode; @@ -72,6 +70,38 @@ class SubscriptionManager : public sys::Runnable * *@param listener Listener object to receive messages. *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(MessageListener& listener, + const std::string& queue, + const FlowControl& flow, + const std::string& tag=std::string()); + + /** + * Subscribe a LocalQueue to receive messages from queue. + * + * Incoming messages are stored in the queue for you to retrieve. + * + *@param queue Name of the queue to subscribe to. + *@param flow initial FlowControl for the subscription. + *@param tag Unique destination tag for the listener. + * If not specified, the queue name is used. + */ + void subscribe(LocalQueue& localQueue, + const std::string& queue, + const FlowControl& flow, + const std::string& tag=std::string()); + + /** + * Subscribe a MessagesListener to receive messages from queue. + * + * Provide your own subclass of MessagesListener to process + * incoming messages. It will be called for each message received. + * + *@param listener Listener object to receive messages. + *@param queue Name of the queue to subscribe to. *@param tag Unique destination tag for the listener. * If not specified, the queue name is used. */ @@ -92,6 +122,11 @@ class SubscriptionManager : public sys::Runnable const std::string& queue, const std::string& tag=std::string()); + /** + * Get a single message from a queue. + */ + Message get(const std::string& queue); + /** Cancel a subscription. */ void cancel(const std::string tag); @@ -107,9 +142,17 @@ class SubscriptionManager : public sys::Runnable /** Cause run() to return */ void stop(); - static const uint32_t UNLIMITED=0xFFFFFFFF; + /** Set the flow control for destination. */ + void setFlowControl(const std::string& destintion, const FlowControl& flow); + + /** Set the default initial flow control for subscriptions that do not specify it. */ + void setFlowControl(const FlowControl& flow); + + /** Get the default flow control for new subscriptions that do not specify it. */ + const FlowControl& getFlowControl() const; + /** Set the flow control for destination tag. *@param tag: name of the destination. *@param messages: message credit. diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp index 83c3317094..0475350d6a 100644 --- a/cpp/src/tests/ClientSessionTest.cpp +++ b/cpp/src/tests/ClientSessionTest.cpp @@ -203,7 +203,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) { ClientSessionFixture fix; SimpleListener mylistener; fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true); - fix.subs.subscribe(mylistener, "myq", "myq"); + fix.subs.subscribe(mylistener, "myq"); sys::Thread runner(fix.subs);//start dispatcher thread string data("msg"); Message msg(data, "myq"); @@ -222,5 +222,30 @@ QPID_AUTO_TEST_CASE(testSendToSelf) { } } +QPID_AUTO_TEST_CASE(testLocalQueue) { + ClientSessionFixture fix; + fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true); + LocalQueue lq; + fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false)); + fix.session.messageTransfer(content=Message("foo0", "lq")); + fix.session.messageTransfer(content=Message("foo1", "lq")); + fix.session.messageTransfer(content=Message("foo2", "lq")); + BOOST_CHECK_EQUAL("foo0", lq.pop().getData()); + BOOST_CHECK_EQUAL("foo1", lq.pop().getData()); + BOOST_CHECK(lq.empty()); // Credit exhausted. + fix.subs.setFlowControl("lq", FlowControl::unlimited()); + BOOST_CHECK_EQUAL("foo2", lq.pop().getData()); +} + +QPID_AUTO_TEST_CASE(testGet) { + ClientSessionFixture fix; + fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true); + fix.session.messageTransfer(content=Message("foo0", "getq")); + fix.session.messageTransfer(content=Message("foo1", "getq")); + BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData()); + BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData()); +} + QPID_AUTO_TEST_SUITE_END() + diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check index 344f15a94e..f16480b7f0 100755 --- a/cpp/src/tests/ais_check +++ b/cpp/src/tests/ais_check @@ -1,7 +1,6 @@ #!/bin/sh -# Check for requirements, run AIS tests if found. -# +# Check AIS requirements tests if found. id -nG | grep '\<ais\>' >/dev/null || \ NOGROUP="You are not a member of the ais group." ps -u root | grep aisexec >/dev/null || \ @@ -24,4 +23,11 @@ EOF exit 0; # A warning, not a failure. fi -echo `dirname $0`/ais_run | newgrp ais +# Run the tests +srcdir=`dirname $0` +$srcdir/start_cluster 4 +./ais_test +ret=$? +$srcdir/stop_cluster +exit $ret + diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run deleted file mode 100755 index 0f45edc39c..0000000000 --- a/cpp/src/tests/ais_run +++ /dev/null @@ -1,15 +0,0 @@ -#!/bin/sh -# -# Run AIS tests, assumes that ais_check has passed and we are -# running with the ais group ID. -# - -# FIXME aconway 2008-01-30: we should valgrind the cluster brokers. - -srcdir=`dirname $0` -$srcdir/start_cluster 4 -./ais_test -ret=$? -$srcdir/stop_cluster -exit $ret - diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk index d373a7d6ab..ba49a6774d 100644 --- a/cpp/src/tests/cluster.mk +++ b/cpp/src/tests/cluster.mk @@ -11,7 +11,7 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la # ais_check checks conditions for AIS tests and runs if ok. TESTS+=ais_check -EXTRA_DIST+=ais_check ais_run start_cluster stop_cluster +EXTRA_DIST+=ais_check start_cluster stop_cluster check_PROGRAMS+=ais_test ais_test_SOURCES=ais_test.cpp Cpg.cpp diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp index f6b3a80c97..efb6e04aa8 100644 --- a/cpp/src/tests/cluster_client.cpp +++ b/cpp/src/tests/cluster_client.cpp @@ -62,14 +62,14 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { ClusterConnections cluster; BOOST_REQUIRE(cluster.size() > 1); - Session broker0 = cluster[0]->newSession(ASYNC); + Session broker0 = cluster[0]->newSession(); broker0.exchangeDeclare(exchange="ex"); broker0.queueDeclare(queue="q"); broker0.queueBind(exchange="ex", queue="q", routingKey="key"); broker0.close(); for (size_t i = 1; i < cluster.size(); ++i) { - Session s = cluster[i]->newSession(ASYNC); + Session s = cluster[i]->newSession(); s.messageTransfer(content=TransferContent("data", "key", "ex")); s.messageSubscribe(queue="q", destination="q"); s.messageFlow(destination="q", unit=0, value=1);//messages @@ -81,4 +81,28 @@ QPID_AUTO_TEST_CASE(testWiringReplication) { } } +QPID_AUTO_TEST_CASE(testMessageReplication) { + // Enqueue on one broker, dequeue on another. + ClusterConnections cluster; + BOOST_REQUIRE(cluster.size() > 1); + + Session broker0 = cluster[0]->newSession(); + broker0.queueDeclare(queue="q"); + broker0.messageTransfer(content=TransferContent("data", "q")); + broker0.close(); + + Session broker1 = cluster[1]->newSession(); + broker1. + s.messageSubscribe(queue="q", destination="q"); + s.messageFlow(destination="q", unit=0, value=1);//messages + FrameSet::shared_ptr msg = s.get(); + BOOST_CHECK(msg->isA<MessageTransferBody>()); + BOOST_CHECK_EQUAL(string("data"), msg->getContent()); + s.getExecution().completed(msg->getId(), true, true); + cluster[i]->close(); + } +} + +// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover. + QPID_AUTO_TEST_SUITE_END() diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster index 876913bc2d..46ecbad9c5 100755 --- a/cpp/src/tests/start_cluster +++ b/cpp/src/tests/start_cluster @@ -3,6 +3,12 @@ # Print the cluster's URL. # +# Execute command with the ais group set. +with_ais_group() { + id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; } + echo $* | newgrp ais +} + test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; } test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; } @@ -13,7 +19,7 @@ CLUSTER=`whoami` # Cluster name=user name, avoid clashes. OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name=$CLUSTER --no-data-dir --auth=no $*" for (( i=0; i<SIZE; ++i )); do - PORT=`../qpidd $OPTS` || exit 1 + PORT=`with_ais_group ../qpidd $OPTS` || exit 1 echo $PORT >> cluster.ports done |