summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp29
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h4
-rw-r--r--qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp20
3 files changed, 45 insertions, 8 deletions
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 27a2107702..abf88c89c5 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -148,11 +148,24 @@ bool IncomingMessages::get(Handler& handler, qpid::sys::Duration timeout)
ScopedRelease release(inUse, lock);
sys::Mutex::ScopedUnlock l(lock);
//wait for suitable new message to arrive
- return process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline));
+ if (process(&handler, timeout == qpid::sys::TIME_INFINITE ? qpid::sys::TIME_INFINITE : qpid::sys::Duration(AbsTime::now(), deadline))) {
+ return true;
+ }
}
+ if (handler.isClosed()) throw qpid::messaging::ReceiverError("Receiver has been closed");
} while (AbsTime::now() < deadline);
return false;
}
+namespace {
+struct Wakeup : public qpid::types::Exception {};
+}
+
+void IncomingMessages::wakeup()
+{
+ sys::Mutex::ScopedLock l(lock);
+ incoming->close(qpid::sys::ExceptionHolder(new Wakeup()));
+ lock.notifyAll();
+}
bool IncomingMessages::getNextDestination(std::string& destination, qpid::sys::Duration timeout)
{
@@ -222,6 +235,16 @@ void IncomingMessages::releasePending(const std::string& destination)
session.messageRelease(match.ids);
}
+bool IncomingMessages::pop(FrameSet::shared_ptr& content, qpid::sys::Duration timeout)
+{
+ try {
+ return incoming->pop(content, timeout);
+ } catch (const Wakeup&) {
+ incoming->open();
+ return false;
+ }
+}
+
/**
* Get a frameset that is accepted by the specified handler from
* session queue, waiting for up to the specified duration and
@@ -234,7 +257,7 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
try {
- for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
MessageTransfer transfer(content, *this);
if (handler && handler->accept(transfer)) {
@@ -261,7 +284,7 @@ bool IncomingMessages::wait(qpid::sys::Duration duration)
{
AbsTime deadline(AbsTime::now(), duration);
FrameSet::shared_ptr content;
- for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ for (Duration timeout = duration; pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
if (content->isA<MessageTransferBody>()) {
QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
sys::Mutex::ScopedLock l(lock);
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index 0f1bbd2720..ff1fb37f3d 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/qpid/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -66,11 +66,13 @@ class IncomingMessages
{
virtual ~Handler() {}
virtual bool accept(MessageTransfer& transfer) = 0;
+ virtual bool isClosed() { return false; }
};
IncomingMessages();
void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
+ void wakeup();
bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
void accept(qpid::framing::SequenceNumber id, bool cumulative);
@@ -94,6 +96,8 @@ class IncomingMessages
bool process(Handler*, qpid::sys::Duration);
bool wait(qpid::sys::Duration);
+ bool pop(FrameSetPtr&, qpid::sys::Duration);
+
void retrieve(FrameSetPtr, qpid::messaging::Message*);
};
diff --git a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 71527bc323..e43abb1a56 100644
--- a/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/qpid/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -276,13 +276,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler
{
typedef boost::function1<bool, IncomingMessages::MessageTransfer&> Callback;
Callback callback;
+ ReceiverImpl* receiver;
- IncomingMessageHandler(Callback c) : callback(c) {}
+ IncomingMessageHandler(Callback c) : callback(c), receiver(0) {}
bool accept(IncomingMessages::MessageTransfer& transfer)
{
return callback(transfer);
}
+
+ bool isClosed()
+ {
+ return receiver && receiver->isClosed();
+ }
};
}
@@ -332,6 +338,7 @@ bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::messagin
bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::messaging::Duration timeout)
{
IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1));
+ handler.receiver = &receiver;
return getIncoming(handler, timeout);
}
@@ -495,10 +502,13 @@ void SessionImpl::releaseImpl(qpid::messaging::Message& m)
void SessionImpl::receiverCancelled(const std::string& name)
{
- ScopedLock l(lock);
- receivers.erase(name);
- session.sync();
- incoming.releasePending(name);
+ {
+ ScopedLock l(lock);
+ receivers.erase(name);
+ session.sync();
+ incoming.releasePending(name);
+ }
+ incoming.wakeup();
}
void SessionImpl::releasePending(const std::string& name)