diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 164 |
1 files changed, 104 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index e6ed4bfc4e..31efff38a6 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -19,6 +19,7 @@ * */ #include "ReceiverImpl.h" +#include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" #include "qpid/messaging/MessageListener.h" @@ -38,11 +39,6 @@ void ReceiverImpl::received(qpid::messaging::Message&) window = capacity; } } - -bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - return parent.get(*this, message, timeout); -} qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) { @@ -50,24 +46,6 @@ qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout) if (!get(result, timeout)) throw Receiver::NoMessageAvailable(); return result; } - -bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - if (capacity == 0 && !cancelled) { - session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); - if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); - } - - if (get(message, timeout)) { - return true; - } else { - if (!cancelled) { - sync(session).messageFlush(destination); - start();//reallocate credit - } - return get(message, 0); - } -} qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) { @@ -76,71 +54,137 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout) return result; } +bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Get f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + +bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + Fetch f(*this, message, timeout); + while (!parent.execute(f)) {} + return f.result; +} + void ReceiverImpl::cancel() { - if (!cancelled) { - //TODO: should syncronicity be an optional argument to this call? - source->cancel(session, destination); - //need to be sure cancel is complete and all incoming - //framesets are processed before removing the receiver - parent.receiverCancelled(destination); - cancelled = true; - } + execute<Cancel>(); } void ReceiverImpl::start() { - if (!cancelled) { - started = true; - session.messageSetFlowMode(destination, capacity > 0); + execute<Start>(); +} + +void ReceiverImpl::stop() +{ + execute<Stop>(); +} + +void ReceiverImpl::setCapacity(uint32_t c) +{ + execute1<SetCapacity>(c); +} + +void ReceiverImpl::startFlow() +{ + if (capacity > 0) { + session.messageSetFlowMode(destination, FLOW_MODE_WINDOW); session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity); session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit); window = capacity; } } -void ReceiverImpl::stop() +void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) { - session.messageStop(destination); - started = false; + + session = s; + if (state == UNRESOLVED) { + source = resolver.resolveSource(session, address, filter, options); + state = STOPPED;//TODO: if session is started, go straight to started + } + if (state == CANCELLED) { + source->cancel(session, destination); + parent.receiverCancelled(destination); + } else { + source->subscribe(session, destination); + if (state == STARTED) start(); + } } -void ReceiverImpl::subscribe() +void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } +qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } + +const std::string& ReceiverImpl::getName() const { return destination; } + +ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, + const qpid::messaging::Address& a, + const qpid::messaging::Filter* f, + const qpid::messaging::Variant::Map& o) : + + parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF), + state(UNRESOLVED), capacity(0), listener(0), window(0) {} + +bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) { - source->subscribe(session, destination); + return parent.get(*this, message, timeout); } -void ReceiverImpl::setSession(qpid::client::AsyncSession s) +bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) +{ + if (state == CANCELLED) return false;//TODO: or should this be an error? + + if (capacity == 0 || state != STARTED) { + session.messageSetFlowMode(destination, FLOW_MODE_CREDIT); + session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1); + session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF); + } + + if (getImpl(message, timeout)) { + return true; + } else { + sync(session).messageFlush(destination); + startFlow();//reallocate credit + return getImpl(message, 0); + } +} + +void ReceiverImpl::cancelImpl() { - session = s; - if (!cancelled) { - subscribe(); - //if we were in started state before the session was changed, - //start again on this new session - //TODO: locking if receiver is to be threadsafe... - if (started) start(); + if (state != CANCELLED) { + state = CANCELLED; + source->cancel(session, destination); + parent.receiverCancelled(destination); } } -void ReceiverImpl::setCapacity(uint32_t c) +void ReceiverImpl::startImpl() +{ + if (state == STOPPED) { + state = STARTED; + startFlow(); + } +} + +void ReceiverImpl::stopImpl() +{ + state = STOPPED; + session.messageStop(destination); +} + +void ReceiverImpl::setCapacityImpl(uint32_t c) { if (c != capacity) { capacity = c; - if (!cancelled && started) { - stop(); - start(); + if (state == STARTED) { + session.messageStop(destination); + startFlow(); } } } -void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } -qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } - -const std::string& ReceiverImpl::getName() const { return destination; } - -ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) : - parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF), - capacity(0), started(false), cancelled(false), listener(0), window(0) {} - }}} // namespace qpid::client::amqp0_10 |
