summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2009-11-12 10:30:53 +0000
committerGordon Sim <gsim@apache.org>2009-11-12 10:30:53 +0000
commit51e77c8bc7dc4d71422b421135ded1cb33bb5c55 (patch)
tree0588f217faf0babf5d4de255d113c05214a997ae /cpp/src/qpid/client
parentf3de267295dc756a0abaa6187562396374cba41c (diff)
downloadqpid-python-51e77c8bc7dc4d71422b421135ded1cb33bb5c55.tar.gz
Merge branch 'next_receiver_changes' into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@835323 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp35
-rw-r--r--cpp/src/qpid/client/amqp0_10/IncomingMessages.h4
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp44
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.h5
4 files changed, 81 insertions, 7 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
index 8e060c62d7..e66dc5915c 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.cpp
@@ -123,6 +123,15 @@ bool IncomingMessages::get(Handler& handler, Duration timeout)
return process(&handler, timeout);
}
+bool IncomingMessages::getNextDestination(std::string& destination, Duration timeout)
+{
+ //if there is not already a received message, we must wait for one
+ if (received.empty() && !wait(timeout)) return false;
+ //else we have a message in received; return the corresponding destination
+ destination = received.front()->as<MessageTransferBody>()->getDestination();
+ return true;
+}
+
void IncomingMessages::accept()
{
acceptTracker.accept(session);
@@ -155,11 +164,11 @@ void IncomingMessages::releasePending(const std::string& destination)
}
/**
- * Get a frameset from session queue, waiting for up to the specified
- * duration and returning true if this could be achieved, false
- * otherwise. If a destination is supplied, only return a message for
- * that destination. In this case messages from other destinations
- * will be held on a received queue.
+ * Get a frameset that is accepted by the specified handler from
+ * session queue, waiting for up to the specified duration and
+ * returning true if this could be achieved, false otherwise. Messages
+ * that are not accepted by the handler are pushed onto received queue
+ * for later retrieval.
*/
bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
{
@@ -183,6 +192,22 @@ bool IncomingMessages::process(Handler* handler, qpid::sys::Duration duration)
return false;
}
+bool IncomingMessages::wait(qpid::sys::Duration duration)
+{
+ AbsTime deadline(AbsTime::now(), duration);
+ FrameSet::shared_ptr content;
+ for (Duration timeout = duration; incoming->pop(content, timeout); timeout = Duration(AbsTime::now(), deadline)) {
+ if (content->isA<MessageTransferBody>()) {
+ QPID_LOG(debug, "Pushed " << *content->getMethod() << " to received queue");
+ received.push_back(content);
+ return true;
+ } else {
+ //TODO: handle other types of commands (e.g. message-accept, message-flow etc)
+ }
+ }
+ return false;
+}
+
uint32_t IncomingMessages::pendingAccept()
{
return acceptTracker.acceptsPending();
diff --git a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
index e84cd18892..2bc6dd49c4 100644
--- a/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
+++ b/cpp/src/qpid/client/amqp0_10/IncomingMessages.h
@@ -70,8 +70,7 @@ class IncomingMessages
void setSession(qpid::client::AsyncSession session);
bool get(Handler& handler, qpid::sys::Duration timeout);
- //bool get(qpid::messaging::Message& message, qpid::sys::Duration timeout);
- //bool get(const std::string& destination, qpid::messaging::Message& message, qpid::sys::Duration timeout);
+ bool getNextDestination(std::string& destination, qpid::sys::Duration timeout);
void accept();
void releaseAll();
void releasePending(const std::string& destination);
@@ -90,6 +89,7 @@ class IncomingMessages
AcceptTracker acceptTracker;
bool process(Handler*, qpid::sys::Duration);
+ bool wait(qpid::sys::Duration);
void retrieve(FrameSetPtr, qpid::messaging::Message*);
};
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 101bc5ce0a..7f8e5f4e79 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -210,6 +210,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler
}
+
+bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
+{
+ Receivers::const_iterator i = receivers.find(transfer.getDestination());
+ if (i == receivers.end()) {
+ QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+ return false;
+ } else {
+ *receiver = i->second;
+ return true;
+ }
+}
+
bool SessionImpl::accept(ReceiverImpl* receiver,
qpid::messaging::Message* message,
bool isDispatch,
@@ -279,6 +292,37 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t
}
}
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (true) {
+ try {
+ std::string destination;
+ if (incoming.getNextDestination(destination, timeout)) {
+ Receivers::const_iterator i = receivers.find(destination);
+ if (i == receivers.end()) {
+ throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
+ } else {
+ receiver = i->second;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (TransportFailure&) {
+ reconnect();
+ }
+ }
+}
+
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Receiver receiver;
+ if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
+ if (!receiver) throw qpid::Exception("Bad receiver returned!");
+ return receiver;
+}
+
uint32_t SessionImpl::available()
{
return get1<Available, uint32_t>((const std::string*) 0);
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
index 9a7918d473..ec9a6162c1 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h
@@ -73,6 +73,10 @@ class SessionImpl : public qpid::messaging::SessionImpl
qpid::messaging::Message fetch(qpid::sys::Duration timeout);
bool dispatch(qpid::sys::Duration timeout);
+ bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout);
+ qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout);
+
+
bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout);
void receiverCancelled(const std::string& name);
@@ -115,6 +119,7 @@ class SessionImpl : public qpid::messaging::SessionImpl
bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&);
bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout);
+ bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer);
void reconnect();
void commitImpl();