summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SessionImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SessionImpl.cpp44
1 files changed, 44 insertions, 0 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
index 101bc5ce0a..7f8e5f4e79 100644
--- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp
@@ -210,6 +210,19 @@ struct IncomingMessageHandler : IncomingMessages::Handler
}
+
+bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageTransfer& transfer)
+{
+ Receivers::const_iterator i = receivers.find(transfer.getDestination());
+ if (i == receivers.end()) {
+ QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination());
+ return false;
+ } else {
+ *receiver = i->second;
+ return true;
+ }
+}
+
bool SessionImpl::accept(ReceiverImpl* receiver,
qpid::messaging::Message* message,
bool isDispatch,
@@ -279,6 +292,37 @@ bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration t
}
}
+bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout)
+{
+ qpid::sys::Mutex::ScopedLock l(lock);
+ while (true) {
+ try {
+ std::string destination;
+ if (incoming.getNextDestination(destination, timeout)) {
+ Receivers::const_iterator i = receivers.find(destination);
+ if (i == receivers.end()) {
+ throw qpid::Exception(QPID_MSG("Received message for unknown destination " << destination));
+ } else {
+ receiver = i->second;
+ }
+ return true;
+ } else {
+ return false;
+ }
+ } catch (TransportFailure&) {
+ reconnect();
+ }
+ }
+}
+
+qpid::messaging::Receiver SessionImpl::nextReceiver(qpid::sys::Duration timeout)
+{
+ qpid::messaging::Receiver receiver;
+ if (!nextReceiver(receiver, timeout)) throw Receiver::NoMessageAvailable();
+ if (!receiver) throw qpid::Exception("Bad receiver returned!");
+ return receiver;
+}
+
uint32_t SessionImpl::available()
{
return get1<Available, uint32_t>((const std::string*) 0);