summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp164
1 files changed, 104 insertions, 60 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
index e6ed4bfc4e..31efff38a6 100644
--- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -19,6 +19,7 @@
*
*/
#include "ReceiverImpl.h"
+#include "AddressResolution.h"
#include "MessageSource.h"
#include "SessionImpl.h"
#include "qpid/messaging/MessageListener.h"
@@ -38,11 +39,6 @@ void ReceiverImpl::received(qpid::messaging::Message&)
window = capacity;
}
}
-
-bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- return parent.get(*this, message, timeout);
-}
qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
{
@@ -50,24 +46,6 @@ qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
return result;
}
-
-bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
-{
- if (capacity == 0 && !cancelled) {
- session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
- if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
- }
-
- if (get(message, timeout)) {
- return true;
- } else {
- if (!cancelled) {
- sync(session).messageFlush(destination);
- start();//reallocate credit
- }
- return get(message, 0);
- }
-}
qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
{
@@ -76,71 +54,137 @@ qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
return result;
}
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Get f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ Fetch f(*this, message, timeout);
+ while (!parent.execute(f)) {}
+ return f.result;
+}
+
void ReceiverImpl::cancel()
{
- if (!cancelled) {
- //TODO: should syncronicity be an optional argument to this call?
- source->cancel(session, destination);
- //need to be sure cancel is complete and all incoming
- //framesets are processed before removing the receiver
- parent.receiverCancelled(destination);
- cancelled = true;
- }
+ execute<Cancel>();
}
void ReceiverImpl::start()
{
- if (!cancelled) {
- started = true;
- session.messageSetFlowMode(destination, capacity > 0);
+ execute<Start>();
+}
+
+void ReceiverImpl::stop()
+{
+ execute<Stop>();
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ execute1<SetCapacity>(c);
+}
+
+void ReceiverImpl::startFlow()
+{
+ if (capacity > 0) {
+ session.messageSetFlowMode(destination, FLOW_MODE_WINDOW);
session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
window = capacity;
}
}
-void ReceiverImpl::stop()
+void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
{
- session.messageStop(destination);
- started = false;
+
+ session = s;
+ if (state == UNRESOLVED) {
+ source = resolver.resolveSource(session, address, filter, options);
+ state = STOPPED;//TODO: if session is started, go straight to started
+ }
+ if (state == CANCELLED) {
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
+ } else {
+ source->subscribe(session, destination);
+ if (state == STARTED) start();
+ }
}
-void ReceiverImpl::subscribe()
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name,
+ const qpid::messaging::Address& a,
+ const qpid::messaging::Filter* f,
+ const qpid::messaging::Variant::Map& o) :
+
+ parent(p), destination(name), address(a), filter(f), options(o), byteCredit(0xFFFFFFFF),
+ state(UNRESOLVED), capacity(0), listener(0), window(0) {}
+
+bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
{
- source->subscribe(session, destination);
+ return parent.get(*this, message, timeout);
}
-void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+bool ReceiverImpl::fetchImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (state == CANCELLED) return false;//TODO: or should this be an error?
+
+ if (capacity == 0 || state != STARTED) {
+ session.messageSetFlowMode(destination, FLOW_MODE_CREDIT);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, 0xFFFFFFFF);
+ }
+
+ if (getImpl(message, timeout)) {
+ return true;
+ } else {
+ sync(session).messageFlush(destination);
+ startFlow();//reallocate credit
+ return getImpl(message, 0);
+ }
+}
+
+void ReceiverImpl::cancelImpl()
{
- session = s;
- if (!cancelled) {
- subscribe();
- //if we were in started state before the session was changed,
- //start again on this new session
- //TODO: locking if receiver is to be threadsafe...
- if (started) start();
+ if (state != CANCELLED) {
+ state = CANCELLED;
+ source->cancel(session, destination);
+ parent.receiverCancelled(destination);
}
}
-void ReceiverImpl::setCapacity(uint32_t c)
+void ReceiverImpl::startImpl()
+{
+ if (state == STOPPED) {
+ state = STARTED;
+ startFlow();
+ }
+}
+
+void ReceiverImpl::stopImpl()
+{
+ state = STOPPED;
+ session.messageStop(destination);
+}
+
+void ReceiverImpl::setCapacityImpl(uint32_t c)
{
if (c != capacity) {
capacity = c;
- if (!cancelled && started) {
- stop();
- start();
+ if (state == STARTED) {
+ session.messageStop(destination);
+ startFlow();
}
}
}
-void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
-qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
-
-const std::string& ReceiverImpl::getName() const { return destination; }
-
-ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
- parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
- capacity(0), started(false), cancelled(false), listener(0), window(0) {}
-
}}} // namespace qpid::client::amqp0_10