summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2008-07-25 10:24:53 +0000
committerGordon Sim <gsim@apache.org>2008-07-25 10:24:53 +0000
commitd8963856c5eff8523a92aa23a3664a52fa530a02 (patch)
tree57b5964791baacfe54af14433000085a73fcac4b
parent40c4ee7bb08d9202253c976f9f8f61d81d6010d0 (diff)
downloadqpid-python-d8963856c5eff8523a92aa23a3664a52fa530a02.tar.gz
Merged fix to SubscriptionManager (was r679739)
git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/branches/qpid.0-10@679748 13f79535-47bb-0310-9956-ffa450edef68
-rw-r--r--cpp/src/qpid/client/SubscriptionManager.cpp5
-rw-r--r--cpp/src/tests/ClientSessionTest.cpp20
2 files changed, 24 insertions, 1 deletions
diff --git a/cpp/src/qpid/client/SubscriptionManager.cpp b/cpp/src/qpid/client/SubscriptionManager.cpp
index 9bb75f9a49..b4c48f7365 100644
--- a/cpp/src/qpid/client/SubscriptionManager.cpp
+++ b/cpp/src/qpid/client/SubscriptionManager.cpp
@@ -134,8 +134,11 @@ bool SubscriptionManager::get(Message& result, const std::string& queue, sys::Du
std::string unique = framing::Uuid(true).str();
subscribe(lq, queue, FlowControl::messageCredit(1), unique);
AutoCancel ac(*this, unique);
+ //first wait for message to be delivered if a timeout has been specified
+ if (timeout && lq.get(result, timeout)) return true;
+ //make sure message is not on queue before final check
sync(session).messageFlush(unique);
- return lq.get(result, timeout);
+ return lq.get(result, 0);
}
}} // namespace qpid::client
diff --git a/cpp/src/tests/ClientSessionTest.cpp b/cpp/src/tests/ClientSessionTest.cpp
index 5861dec9fc..c41bd63f69 100644
--- a/cpp/src/tests/ClientSessionTest.cpp
+++ b/cpp/src/tests/ClientSessionTest.cpp
@@ -41,6 +41,7 @@ using namespace qpid::client::arg;
using namespace qpid::framing;
using namespace qpid;
using qpid::sys::Monitor;
+using qpid::sys::Thread;
using qpid::sys::TIME_SEC;
using std::string;
using std::cout;
@@ -238,6 +239,19 @@ QPID_AUTO_TEST_CASE(testLocalQueue) {
BOOST_CHECK_EQUAL("foo2", lq.pop().getData());
}
+struct DelayedTransfer : sys::Runnable
+{
+ ClientSessionFixture& fixture;
+
+ DelayedTransfer(ClientSessionFixture& f) : fixture(f) {}
+
+ void run()
+ {
+ sleep(1);
+ fixture.session.messageTransfer(content=Message("foo2", "getq"));
+ }
+};
+
QPID_AUTO_TEST_CASE(testGet) {
ClientSessionFixture fix;
fix.session.queueDeclare(queue="getq", exclusive=true, autoDelete=true);
@@ -249,6 +263,12 @@ QPID_AUTO_TEST_CASE(testGet) {
BOOST_CHECK(fix.subs.get(got, "getq", TIME_SEC));
BOOST_CHECK_EQUAL("foo1", got.getData());
BOOST_CHECK(!fix.subs.get(got, "getq"));
+ DelayedTransfer sender(fix);
+ Thread t(sender);
+ //test timed get where message shows up after a short delay
+ BOOST_CHECK(fix.subs.get(got, "getq", 5*TIME_SEC));
+ BOOST_CHECK_EQUAL("foo2", got.getData());
+ t.join();
}
QPID_AUTO_TEST_SUITE_END()