summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
authorAlan Conway <aconway@apache.org>2008-06-25 20:51:30 +0000
committerAlan Conway <aconway@apache.org>2008-06-25 20:51:30 +0000
commit4d560b89fa09056c22cd42e212c9ce8addeecb5a (patch)
tree3a15eb32ca85193ed5d2b97b5c6e7a6f053fb5f1 /cpp
parent830943be4ed6ae90edd2e2655720c0dcc721171d (diff)
downloadqpid-python-4d560b89fa09056c22cd42e212c9ce8addeecb5a.tar.gz
Additions to the client API:
- SubscriptionManager::get(queue) to get a single message from a queue. - Set FlowControl per-subscription. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@671655 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/Makefile.am1
-rw-r--r--cpp/src/qpid/client/FlowControl.h73
-rw-r--r--cpp/src/qpid/client/LocalQueue.h2
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp42
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.h55
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp27
-rwxr-xr-xcpp/src/tests/ais_check12
-rwxr-xr-xcpp/src/tests/ais_run15
-rw-r--r--cpp/src/tests/cluster.mk2
-rw-r--r--cpp/src/tests/cluster_client.cpp28
-rwxr-xr-xcpp/src/tests/start_cluster8
11 files changed, 227 insertions, 38 deletions
diff --git a/cpp/src/Makefile.am b/cpp/src/Makefile.am
index 12bca2396b..2496c5f8b0 100644
--- a/cpp/src/Makefile.am
+++ b/cpp/src/Makefile.am
@@ -444,6 +444,7 @@ nobase_include_HEADERS = \
qpid/client/Demux.h \
qpid/client/Dispatcher.h \
qpid/client/Execution.h \
+ qpid/client/FlowControl.h \
qpid/client/Future.h \
qpid/client/FutureCompletion.h \
qpid/client/FutureResult.h \
diff --git a/cpp/src/qpid/client/FlowControl.h b/cpp/src/qpid/client/FlowControl.h
new file mode 100644
index 0000000000..a4ed9879f4
--- /dev/null
+++ b/cpp/src/qpid/client/FlowControl.h
@@ -0,0 +1,73 @@
+#ifndef QPID_CLIENT_FLOWCONTROL_H
+#define QPID_CLIENT_FLOWCONTROL_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+namespace qpid {
+namespace client {
+
+/**
+ * Flow control works by associating a finite amount of "credit"
+ * associated with a subscription.
+ *
+ * Credit includes a message count and a byte count. Each message
+ * received decreases the message count by one, and the byte count by
+ * the size of the message. Either count can have the special value
+ * UNLIMITED which is never decreased.
+ *
+ * A subscription's credit is exhausted when the message count is 0 or
+ * the byte count is too small for the next available message. The
+ * subscription will not receive any further messages until is credit
+ * is renewed.
+ *
+ * In "window mode" credit is automatically renewed when a message is
+ * acknowledged (@see AckPolicy) In non-window mode credit is not
+ * automatically renewed, it must be explicitly re-set (@see
+ * SubscriptionManager)
+ */
+struct FlowControl {
+ static const uint32_t UNLIMITED=0xFFFFFFFF;
+ FlowControl(uint32_t messages_=0, uint32_t bytes_=0, bool window_=false)
+ : messages(messages_), bytes(bytes_), window(window_) {}
+
+ static FlowControl messageCredit(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,false); }
+ static FlowControl messageWindow(uint32_t messages_) { return FlowControl(messages_,UNLIMITED,true); }
+ static FlowControl byteCredit(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,false); }
+ static FlowControl byteWindow(uint32_t bytes_) { return FlowControl(UNLIMITED,bytes_,true); }
+ static FlowControl unlimited() { return FlowControl(UNLIMITED, UNLIMITED, false); }
+ static FlowControl zero() { return FlowControl(0, 0, false); }
+
+ /** Message credit: subscription can accept up to this many messages. */
+ uint32_t messages;
+ /** Byte credit: subscription can accept up to this many bytes of message content. */
+ uint32_t bytes;
+ /** Window mode. If true credit is automatically renewed as messages are acknowledged. */
+ bool window;
+
+ bool operator==(const FlowControl& x) {
+ return messages == x.messages && bytes == x.bytes && window == x.window;
+ };
+};
+
+}} // namespace qpid::client
+
+#endif /*!QPID_CLIENT_FLOWCONTROL_H*/
diff --git a/cpp/src/qpid/client/LocalQueue.h b/cpp/src/qpid/client/LocalQueue.h
index c76d0756eb..273814f179 100644
--- a/cpp/src/qpid/client/LocalQueue.h
+++ b/cpp/src/qpid/client/LocalQueue.h
@@ -56,7 +56,7 @@ class LocalQueue
*/
Message pop();
- /** Synonym for get(). */
+ /** Synonym for pop(). */
Message get() { return pop(); }
/** Return true if local queue is empty. */
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 3fa75a54ac..324b11e1df 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -25,6 +25,7 @@
#include <qpid/client/Dispatcher.h>
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
+#include <qpid/framing/Uuid.h>
#include <set>
#include <sstream>
@@ -34,35 +35,48 @@ namespace client {
SubscriptionManager::SubscriptionManager(const Session& s)
: dispatcher(s), session(s),
- messages(UNLIMITED), bytes(UNLIMITED), window(true),
+ flowControl(UNLIMITED, UNLIMITED, false),
acceptMode(0), acquireMode(0),
autoStop(true)
{}
void SubscriptionManager::subscribeInternal(
- const std::string& q, const std::string& dest)
+ const std::string& q, const std::string& dest, const FlowControl& fc)
{
session.messageSubscribe(
arg::queue=q, arg::destination=dest,
arg::acceptMode=acceptMode, arg::acquireMode=acquireMode);
- setFlowControl(dest, messages, bytes, window);
+ if (fc.messages || fc.bytes) // No need to set if all 0.
+ setFlowControl(dest, fc);
}
void SubscriptionManager::subscribe(
MessageListener& listener, const std::string& q, const std::string& d)
{
+ subscribe(listener, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ MessageListener& listener, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
dispatcher.listen(dest, &listener, autoAck);
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::subscribe(
LocalQueue& lq, const std::string& q, const std::string& d)
{
+ subscribe(lq, q, getFlowControl(), d);
+}
+
+void SubscriptionManager::subscribe(
+ LocalQueue& lq, const std::string& q, const FlowControl& fc, const std::string& d)
+{
std::string dest=d.empty() ? q:d;
lq.session=session;
lq.queue=session.getExecution().getDemux().add(dest, ByTransferDest(dest));
- return subscribeInternal(q, dest);
+ return subscribeInternal(q, dest, fc);
}
void SubscriptionManager::setFlowControl(
@@ -74,14 +88,20 @@ void SubscriptionManager::setFlowControl(
session.sync();
}
+void SubscriptionManager::setFlowControl(const std::string& dest, const FlowControl& fc) {
+ setFlowControl(dest, fc.messages, fc.bytes, fc.window);
+}
+
+void SubscriptionManager::setFlowControl(const FlowControl& fc) { flowControl=fc; }
+
void SubscriptionManager::setFlowControl(
uint32_t messages_, uint32_t bytes_, bool window_)
{
- messages=messages_;
- bytes=bytes_;
- window=window_;
+ setFlowControl(FlowControl(messages_, bytes_, window_));
}
+const FlowControl& SubscriptionManager::getFlowControl() const { return flowControl; }
+
void SubscriptionManager::setAcceptMode(bool c) { acceptMode=c; }
void SubscriptionManager::setAcquireMode(bool a) { acquireMode=a; }
@@ -109,6 +129,12 @@ void SubscriptionManager::stop()
dispatcher.stop();
}
+Message SubscriptionManager::get(const std::string& queue) {
+ LocalQueue lq;
+ subscribe(lq, queue, FlowControl::messageCredit(1), framing::Uuid(true).str());
+ return lq.get();
+}
+
}} // namespace qpid::client
#endif
diff --git a/cpp/src/qpid/client/SubscriptionManager.h b/cpp/src/qpid/client/SubscriptionManager.h
index 930175564e..0aa55099f5 100644
--- a/cpp/src/qpid/client/SubscriptionManager.h
+++ b/cpp/src/qpid/client/SubscriptionManager.h
@@ -27,8 +27,8 @@
#include <qpid/client/Session.h>
#include <qpid/client/MessageListener.h>
#include <qpid/client/LocalQueue.h>
+#include <qpid/client/FlowControl.h>
#include <qpid/sys/Runnable.h>
-
#include <set>
#include <sstream>
@@ -48,13 +48,11 @@ class SubscriptionManager : public sys::Runnable
typedef sys::Mutex::ScopedLock Lock;
typedef sys::Mutex::ScopedUnlock Unlock;
- void subscribeInternal(const std::string& q, const std::string& dest);
+ void subscribeInternal(const std::string& q, const std::string& dest, const FlowControl&);
qpid::client::Dispatcher dispatcher;
qpid::client::AsyncSession session;
- uint32_t messages;
- uint32_t bytes;
- bool window;
+ FlowControl flowControl;
AckPolicy autoAck;
bool acceptMode;
bool acquireMode;
@@ -72,6 +70,38 @@ class SubscriptionManager : public sys::Runnable
*
*@param listener Listener object to receive messages.
*@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(MessageListener& listener,
+ const std::string& queue,
+ const FlowControl& flow,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a LocalQueue to receive messages from queue.
+ *
+ * Incoming messages are stored in the queue for you to retrieve.
+ *
+ *@param queue Name of the queue to subscribe to.
+ *@param flow initial FlowControl for the subscription.
+ *@param tag Unique destination tag for the listener.
+ * If not specified, the queue name is used.
+ */
+ void subscribe(LocalQueue& localQueue,
+ const std::string& queue,
+ const FlowControl& flow,
+ const std::string& tag=std::string());
+
+ /**
+ * Subscribe a MessagesListener to receive messages from queue.
+ *
+ * Provide your own subclass of MessagesListener to process
+ * incoming messages. It will be called for each message received.
+ *
+ *@param listener Listener object to receive messages.
+ *@param queue Name of the queue to subscribe to.
*@param tag Unique destination tag for the listener.
* If not specified, the queue name is used.
*/
@@ -92,6 +122,11 @@ class SubscriptionManager : public sys::Runnable
const std::string& queue,
const std::string& tag=std::string());
+ /**
+ * Get a single message from a queue.
+ */
+ Message get(const std::string& queue);
+
/** Cancel a subscription. */
void cancel(const std::string tag);
@@ -107,9 +142,17 @@ class SubscriptionManager : public sys::Runnable
/** Cause run() to return */
void stop();
-
static const uint32_t UNLIMITED=0xFFFFFFFF;
+ /** Set the flow control for destination. */
+ void setFlowControl(const std::string& destintion, const FlowControl& flow);
+
+ /** Set the default initial flow control for subscriptions that do not specify it. */
+ void setFlowControl(const FlowControl& flow);
+
+ /** Get the default flow control for new subscriptions that do not specify it. */
+ const FlowControl& getFlowControl() const;
+
/** Set the flow control for destination tag.
*@param tag: name of the destination.
*@param messages: message credit.
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 83c3317094..0475350d6a 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -203,7 +203,7 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {
ClientSessionFixture fix;
SimpleListener mylistener;
fix.session.queueDeclare(queue="myq", exclusive=true, autoDelete=true);
- fix.subs.subscribe(mylistener, "myq", "myq");
+ fix.subs.subscribe(mylistener, "myq");
sys::Thread runner(fix.subs);//start dispatcher thread
string data("msg");
Message msg(data, "myq");
@@ -222,5 +222,30 @@ QPID_AUTO_TEST_CASE(testSendToSelf) {
}
}
+QPID_AUTO_TEST_CASE(testLocalQueue) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(queue="lq", exclusive=true, autoDelete=true);
+ LocalQueue lq;
+ fix.subs.subscribe(lq, "lq", FlowControl(2, FlowControl::UNLIMITED, false));
+ fix.session.messageTransfer(content=Message("foo0", "lq"));
+ fix.session.messageTransfer(content=Message("foo1", "lq"));
+ fix.session.messageTransfer(content=Message("foo2", "lq"));
+ BOOST_CHECK_EQUAL("foo0", lq.pop().getData());
+ BOOST_CHECK_EQUAL("foo1", lq.pop().getData());
+ BOOST_CHECK(lq.empty()); // Credit exhausted.
+ fix.subs.setFlowControl("lq", FlowControl::unlimited());
+ BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
+}
+
+QPID_AUTO_TEST_CASE(testGet) {
+ ClientSessionFixture fix;
+ fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
+ fix.session.messageTransfer(content=Message("foo0", "getq"));
+ fix.session.messageTransfer(content=Message("foo1", "getq"));
+ BOOST_CHECK_EQUAL("foo0", fix.subs.get("getq").getData());
+ BOOST_CHECK_EQUAL("foo1", fix.subs.get("getq").getData());
+}
+
QPID_AUTO_TEST_SUITE_END()
+
diff --git a/cpp/src/tests/ais_check b/cpp/src/tests/ais_check
index 344f15a94e..f16480b7f0 100755
--- a/cpp/src/tests/ais_check
+++ b/cpp/src/tests/ais_check
@@ -1,7 +1,6 @@
#!/bin/sh
-# Check for requirements, run AIS tests if found.
-#
+# Check AIS requirements tests if found.
id -nG | grep '\<ais\>' >/dev/null || \
NOGROUP="You are not a member of the ais group."
ps -u root | grep aisexec >/dev/null || \
@@ -24,4 +23,11 @@ EOF
exit 0; # A warning, not a failure.
fi
-echo `dirname $0`/ais_run | newgrp ais
+# Run the tests
+srcdir=`dirname $0`
+$srcdir/start_cluster 4
+./ais_test
+ret=$?
+$srcdir/stop_cluster
+exit $ret
+
diff --git a/cpp/src/tests/ais_run b/cpp/src/tests/ais_run
deleted file mode 100755
index 0f45edc39c..0000000000
--- a/cpp/src/tests/ais_run
+++ /dev/null
@@ -1,15 +0,0 @@
-#!/bin/sh
-#
-# Run AIS tests, assumes that ais_check has passed and we are
-# running with the ais group ID.
-#
-
-# FIXME aconway 2008-01-30: we should valgrind the cluster brokers.
-
-srcdir=`dirname $0`
-$srcdir/start_cluster 4
-./ais_test
-ret=$?
-$srcdir/stop_cluster
-exit $ret
-
diff --git a/cpp/src/tests/cluster.mk b/cpp/src/tests/cluster.mk
index d373a7d6ab..ba49a6774d 100644
--- a/cpp/src/tests/cluster.mk
+++ b/cpp/src/tests/cluster.mk
@@ -11,7 +11,7 @@ lib_cluster = $(abs_builddir)/../libqpidcluster.la
# ais_check checks conditions for AIS tests and runs if ok.
TESTS+=ais_check
-EXTRA_DIST+=ais_check ais_run start_cluster stop_cluster
+EXTRA_DIST+=ais_check start_cluster stop_cluster
check_PROGRAMS+=ais_test
ais_test_SOURCES=ais_test.cpp Cpg.cpp
diff --git a/cpp/src/tests/cluster_client.cpp b/cpp/src/tests/cluster_client.cpp
index f6b3a80c97..efb6e04aa8 100644
--- a/cpp/src/tests/cluster_client.cpp
+++ b/cpp/src/tests/cluster_client.cpp
@@ -62,14 +62,14 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
ClusterConnections cluster;
BOOST_REQUIRE(cluster.size() > 1);
- Session broker0 = cluster[0]->newSession(ASYNC);
+ Session broker0 = cluster[0]->newSession();
broker0.exchangeDeclare(exchange="ex");
broker0.queueDeclare(queue="q");
broker0.queueBind(exchange="ex", queue="q", routingKey="key");
broker0.close();
for (size_t i = 1; i < cluster.size(); ++i) {
- Session s = cluster[i]->newSession(ASYNC);
+ Session s = cluster[i]->newSession();
s.messageTransfer(content=TransferContent("data", "key", "ex"));
s.messageSubscribe(queue="q", destination="q");
s.messageFlow(destination="q", unit=0, value=1);//messages
@@ -81,4 +81,28 @@ QPID_AUTO_TEST_CASE(testWiringReplication) {
}
}
+QPID_AUTO_TEST_CASE(testMessageReplication) {
+ // Enqueue on one broker, dequeue on another.
+ ClusterConnections cluster;
+ BOOST_REQUIRE(cluster.size() > 1);
+
+ Session broker0 = cluster[0]->newSession();
+ broker0.queueDeclare(queue="q");
+ broker0.messageTransfer(content=TransferContent("data", "q"));
+ broker0.close();
+
+ Session broker1 = cluster[1]->newSession();
+ broker1.
+ s.messageSubscribe(queue="q", destination="q");
+ s.messageFlow(destination="q", unit=0, value=1);//messages
+ FrameSet::shared_ptr msg = s.get();
+ BOOST_CHECK(msg->isA<MessageTransferBody>());
+ BOOST_CHECK_EQUAL(string("data"), msg->getContent());
+ s.getExecution().completed(msg->getId(), true, true);
+ cluster[i]->close();
+ }
+}
+
+// TODO aconway 2008-06-25: dequeue replication, exactly once delivery, failover.
+
QPID_AUTO_TEST_SUITE_END()
diff --git a/cpp/src/tests/start_cluster b/cpp/src/tests/start_cluster
index 876913bc2d..46ecbad9c5 100755
--- a/cpp/src/tests/start_cluster
+++ b/cpp/src/tests/start_cluster
@@ -3,6 +3,12 @@
# Print the cluster's URL.
#
+# Execute command with the ais group set.
+with_ais_group() {
+ id -nG | grep '\<ais\>' >/dev/null || { echo "You are not a member of the ais group."; exit 1; }
+ echo $* | newgrp ais
+}
+
test -f cluster.ports && { echo "cluster.ports file already exists" ; exit 1; }
test -z "$*" && { echo "Usage: $0 cluster-size [options]"; exit 1; }
@@ -13,7 +19,7 @@ CLUSTER=`whoami` # Cluster name=user name, avoid clashes.
OPTS="--load-module ../.libs/libqpidcluster.so -dp0 --log-output=cluster$i.log --cluster-name=$CLUSTER --no-data-dir --auth=no $*"
for (( i=0; i<SIZE; ++i )); do
- PORT=`../qpidd $OPTS` || exit 1
+ PORT=`with_ais_group ../qpidd $OPTS` || exit 1
echo $PORT >> cluster.ports
done