From 00e14b01033ae8c33399bc6ebfd28930498b2533 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Wed, 7 Apr 2010 19:41:44 +0000 Subject: QPID-664: removed flush, added option to make sync non-blocking if so desired git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@931651 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/SessionBase_0_10.cpp | 7 +++++++ cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 2 +- cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 23 +++++++---------------- cpp/src/qpid/client/amqp0_10/SessionImpl.h | 14 ++++++-------- 4 files changed, 21 insertions(+), 25 deletions(-) (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/SessionBase_0_10.cpp b/cpp/src/qpid/client/SessionBase_0_10.cpp index e114b7aacc..6aa13bb579 100644 --- a/cpp/src/qpid/client/SessionBase_0_10.cpp +++ b/cpp/src/qpid/client/SessionBase_0_10.cpp @@ -65,6 +65,13 @@ void SessionBase_0_10::sendCompletion() impl->sendCompletion(); } +void SessionBase_0_10::sendSyncRequest() +{ + ExecutionSyncBody b; + b.setSync(true); + impl->send(b); +} + uint16_t SessionBase_0_10::getChannel() const { return impl->getChannel(); } void SessionBase_0_10::suspend() { impl->suspend(); } diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e8c106976f..522c93a552 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -43,7 +43,7 @@ void SenderImpl::send(const qpid::messaging::Message& message, bool sync) Send f(*this, &message); while (f.repeat) parent->execute(f); } - if (sync) parent->sync(); + if (sync) parent->sync(true); } void SenderImpl::close() diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 9efafb1d16..8f9751a967 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -51,14 +51,10 @@ namespace amqp0_10 { SessionImpl::SessionImpl(ConnectionImpl& c, bool t) : connection(&c), transactional(t) {} -void SessionImpl::sync() +void SessionImpl::sync(bool block) { - retry(); -} - -void SessionImpl::flush() -{ - retry(); + if (block) retry(); + else execute(); } void SessionImpl::commit() @@ -82,7 +78,7 @@ void SessionImpl::acknowledge(bool sync_) //message may be redelivered; i.e. the application cannot delete //any state necessary for preventing reprocessing of the message execute(); - if (sync_) sync(); + if (sync_) sync(true); } void SessionImpl::reject(qpid::messaging::Message& m) @@ -378,17 +374,12 @@ uint32_t SessionImpl::pendingAckImpl(const std::string* destination) } } -void SessionImpl::syncImpl() -{ - session.sync(); -} - -void SessionImpl::flushImpl() +void SessionImpl::syncImpl(bool block) { - session.flush(); + if (block) session.sync(); + else session.sendSyncRequest(); } - void SessionImpl::commitImpl() { incoming.accept(); diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index 5f9b23fb14..8b098e65d6 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -62,8 +62,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void reject(qpid::messaging::Message&); void release(qpid::messaging::Message&); void close(); - void sync(); - void flush(); + void sync(bool block); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); @@ -126,8 +125,7 @@ class SessionImpl : public qpid::messaging::SessionImpl void rejectImpl(qpid::messaging::Message&); void releaseImpl(qpid::messaging::Message&); void closeImpl(); - void syncImpl(); - void flushImpl(); + void syncImpl(bool block); qpid::messaging::Sender createSenderImpl(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiverImpl(const qpid::messaging::Address& address); uint32_t availableImpl(const std::string* destination); @@ -163,13 +161,13 @@ class SessionImpl : public qpid::messaging::SessionImpl struct Sync : Command { Sync(SessionImpl& i) : Command(i) {} - void operator()() { impl.syncImpl(); } + void operator()() { impl.syncImpl(true); } }; - struct Flush : Command + struct NonBlockingSync : Command { - Flush(SessionImpl& i) : Command(i) {} - void operator()() { impl.flushImpl(); } + NonBlockingSync(SessionImpl& i) : Command(i) {} + void operator()() { impl.syncImpl(false); } }; struct Reject : Command -- cgit v1.2.1