diff options
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp | 29 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h | 4 | ||||
| -rw-r--r-- | qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 20 |
3 files changed, 45 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp index 27a2107702..abf88c89c5 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp @@ -148,11 +148,24 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout) ScopedRelease release(inUse, lock); sys::Mutex::ScopedUnlock l(lock); //wait for suitable new message to arrive - return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline)); + if (process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline))) { + return true; + } } + if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed"); } while (AbsTime::now() < deadline); return false; } +namespace { +struct Wakeup : public qpid::types::Exception {}; +} + +void IncomingMessages::wakeup() +{ + sys::Mutex::ScopedLock l(lock); + incoming->close(qpid::sys::ExceptionHolder(new Wakeup())); + lock.notifyAll(); +} bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout) { @@ -222,6 +235,16 @@ void IncomingMessages::releasePending(const std::string& destination) session.messageRelease(match.ids); } +bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout) +{ + try { + return incoming->pop(content, timeout); + } catch (const Wakeup&) { + incoming->open(); + return false; + } +} + /** * Get a frameset that is accepted by the specified handler from * session queue, waiting for up to the specified duration and @@ -234,7 +257,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration) AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; try { - for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { MessageTransfer transfer(content, *this); if (handler && handler->accept(transfer)) { @@ -261,7 +284,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration) { AbsTime deadline(AbsTime::now(), duration); FrameSet::shared_ptr content; - for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { + for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) { if (content->isA<MessageTransferBody>()) { QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue"); sys::Mutex::ScopedLock l(lock); diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h index 0f1bbd2720..ff1fb37f3d 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h +++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h @@ -66,11 +66,13 @@ class IncomingMessages { virtual ~Handler() {} virtual bool accept(MessageTransfer& transfer) = 0; + virtual bool isClosed() { return false; } }; IncomingMessages(); void setSession(qpid::client::AsyncSession session); bool get(Handler& handler, qpid::sys::Duration timeout); + void wakeup(); bool getNextDestination(std::string& destination, qpid::sys::Duration timeout); void accept(); void accept(qpid::framing::SequenceNumber id, bool cumulative); @@ -94,6 +96,8 @@ class IncomingMessages bool process(Handler*, qpid::sys::Duration); bool wait(qpid::sys::Duration); + bool pop(FrameSetPtr&, qpid::sys::Duration); + void retrieve(FrameSetPtr, qpid::messaging::Message*); }; diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 71527bc323..e43abb1a56 100644 --- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -276,13 +276,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler { typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback; Callback callback; + ReceiverImpl* receiver; - IncomingMessageHandler(Callback c) : callback(c) {} + IncomingMessageHandler(Callback c) : callback(c), receiver(0) {} bool accept(IncomingMessages::MessageTransfer& transfer) { return callback(transfer); } + + bool isClosed() + { + return receiver && receiver->isClosed(); + } }; } @@ -332,6 +338,7 @@ bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messagin bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout) { IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); + handler.receiver = &receiver; return getIncoming(handler, timeout); } @@ -495,10 +502,13 @@ void SessionImpl::releaseImpl(qpid::messaging::Message& m) void SessionImpl::receiverCancelled(const std::string& name) { - ScopedLock l(lock); - receivers.erase(name); - session.sync(); - incoming.releasePending(name); + { + ScopedLock l(lock); + receivers.erase(name); + session.sync(); + incoming.releasePending(name); + } + incoming.wakeup(); } void SessionImpl::releasePending(const std::string& name) |
