summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-08-25 17:57:34 +0000
committerGordon Sim <gsim@apache.org>2009-08-25 17:57:34 +0000
commit082fa377137d1a73382a0c3f1ab22b5abe6cb485 (patch)
tree27375051e0f05a91ff63f123b2b027916840221c /cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
parent28e1de98b115ebc834a1e232bfd630809689a59e (diff)
downloadqpid-python-082fa377137d1a73382a0c3f1ab22b5abe6cb485.tar.gz
QPID-664: Initial checkin of high level messaging api for c++
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@807731 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp146
1 files changed, 146 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
new file mode 100644
index 0000000000..e6ed4bfc4e
--- /dev/null
+++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp
@@ -0,0 +1,146 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "ReceiverImpl.h"
+#include "MessageSource.h"
+#include "SessionImpl.h"
+#include "qpid/messaging/MessageListener.h"
+#include "qpid/messaging/Receiver.h"
+
+namespace qpid {
+namespace client {
+namespace amqp0_10 {
+
+using qpid::messaging::Receiver;
+
+void ReceiverImpl::received(qpid::messaging::Message&)
+{
+ //TODO: should this be configurable
+ if (capacity && --window <= capacity/2) {
+ session.sendCompletion();
+ window = capacity;
+ }
+}
+
+bool ReceiverImpl::get(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ return parent.get(*this, message, timeout);
+}
+
+qpid::messaging::Message ReceiverImpl::get(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!get(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+bool ReceiverImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout)
+{
+ if (capacity == 0 && !cancelled) {
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, 1);
+ if (!started) session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ }
+
+ if (get(message, timeout)) {
+ return true;
+ } else {
+ if (!cancelled) {
+ sync(session).messageFlush(destination);
+ start();//reallocate credit
+ }
+ return get(message, 0);
+ }
+}
+
+qpid::messaging::Message ReceiverImpl::fetch(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Message result;
+ if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable();
+ return result;
+}
+
+void ReceiverImpl::cancel()
+{
+ if (!cancelled) {
+ //TODO: should syncronicity be an optional argument to this call?
+ source->cancel(session, destination);
+ //need to be sure cancel is complete and all incoming
+ //framesets are processed before removing the receiver
+ parent.receiverCancelled(destination);
+ cancelled = true;
+ }
+}
+
+void ReceiverImpl::start()
+{
+ if (!cancelled) {
+ started = true;
+ session.messageSetFlowMode(destination, capacity > 0);
+ session.messageFlow(destination, CREDIT_UNIT_MESSAGE, capacity);
+ session.messageFlow(destination, CREDIT_UNIT_BYTE, byteCredit);
+ window = capacity;
+ }
+}
+
+void ReceiverImpl::stop()
+{
+ session.messageStop(destination);
+ started = false;
+}
+
+void ReceiverImpl::subscribe()
+{
+ source->subscribe(session, destination);
+}
+
+void ReceiverImpl::setSession(qpid::client::AsyncSession s)
+{
+ session = s;
+ if (!cancelled) {
+ subscribe();
+ //if we were in started state before the session was changed,
+ //start again on this new session
+ //TODO: locking if receiver is to be threadsafe...
+ if (started) start();
+ }
+}
+
+void ReceiverImpl::setCapacity(uint32_t c)
+{
+ if (c != capacity) {
+ capacity = c;
+ if (!cancelled && started) {
+ stop();
+ start();
+ }
+ }
+}
+
+void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; }
+qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; }
+
+const std::string& ReceiverImpl::getName() const { return destination; }
+
+ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, std::auto_ptr<MessageSource> s) :
+ parent(p), source(s), destination(name), byteCredit(0xFFFFFFFF),
+ capacity(0), started(false), cancelled(false), listener(0), window(0) {}
+
+
+}}} // namespace qpid::client::amqp0_10