diff options
| author | Gordon Sim <gsim@apache.org> | 2009-11-16 11:58:45 +0000 |
|---|---|---|
| committer | Gordon Sim <gsim@apache.org> | 2009-11-16 11:58:45 +0000 |
| commit | efc6473096622a01b2a3907093431b49d8ebfb1e (patch) | |
| tree | 27c712cf1eeff318c0663e77cdea3db4a5e54095 /cpp/src/qpid/client | |
| parent | 454379917ad7b797a045cbefc56bf598e3fd534b (diff) | |
| download | qpid-python-efc6473096622a01b2a3907093431b49d8ebfb1e.tar.gz | |
Merge branch 'next_receiver_changes' into trunk
git-svn-id: https://svn.apache.org/repos/asf/qpid/trunk/qpid@880718 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp | 5 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/ReceiverImpl.h | 2 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.cpp | 72 | ||||
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SessionImpl.h | 12 |
4 files changed, 3 insertions, 88 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp index f294d7e273..83b245aa02 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.cpp @@ -22,7 +22,6 @@ #include "AddressResolution.h" #include "MessageSource.h" #include "SessionImpl.h" -#include "qpid/messaging/MessageListener.h" #include "qpid/messaging/Receiver.h" namespace qpid { @@ -115,8 +114,6 @@ void ReceiverImpl::init(qpid::client::AsyncSession s, AddressResolution& resolve } } -void ReceiverImpl::setListener(qpid::messaging::MessageListener* l) { listener = l; } -qpid::messaging::MessageListener* ReceiverImpl::getListener() { return listener; } const std::string& ReceiverImpl::getName() const { return destination; } @@ -139,7 +136,7 @@ ReceiverImpl::ReceiverImpl(SessionImpl& p, const std::string& name, const qpid::messaging::Address& a) : parent(p), destination(name), address(a), byteCredit(0xFFFFFFFF), - state(UNRESOLVED), capacity(0), listener(0), window(0) {} + state(UNRESOLVED), capacity(0), window(0) {} bool ReceiverImpl::getImpl(qpid::messaging::Message& message, qpid::sys::Duration timeout) { diff --git a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h index d05fd3d045..3a18368116 100644 --- a/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h +++ b/cpp/src/qpid/client/amqp0_10/ReceiverImpl.h @@ -62,8 +62,6 @@ class ReceiverImpl : public qpid::messaging::ReceiverImpl uint32_t getCapacity(); uint32_t available(); uint32_t pendingAck(); - void setListener(qpid::messaging::MessageListener* listener); - qpid::messaging::MessageListener* getListener(); void received(qpid::messaging::Message& message); private: SessionImpl& parent; diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp index 7f8e5f4e79..d0085dad75 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.cpp @@ -30,7 +30,6 @@ #include "qpid/messaging/Address.h" #include "qpid/messaging/Message.h" #include "qpid/messaging/MessageImpl.h" -#include "qpid/messaging/MessageListener.h" #include "qpid/messaging/Sender.h" #include "qpid/messaging/Receiver.h" #include "qpid/messaging/Session.h" @@ -177,13 +176,6 @@ Sender SessionImpl::createSenderImpl(const qpid::messaging::Address& address) return sender; } -qpid::messaging::Address SessionImpl::createTempQueue(const std::string& baseName) -{ - std::string name = baseName + std::string("_") + session.getId().getName(); - session.queueDeclare(arg::queue=name, arg::exclusive=true, arg::autoDelete=true); - return qpid::messaging::Address(name); -} - SessionImpl& SessionImpl::convert(qpid::messaging::Session& s) { boost::intrusive_ptr<SessionImpl> impl = getImplPtr<qpid::messaging::Session, SessionImpl>(s); @@ -225,16 +217,10 @@ bool SessionImpl::getNextReceiver(Receiver* receiver, IncomingMessages::MessageT bool SessionImpl::accept(ReceiverImpl* receiver, qpid::messaging::Message* message, - bool isDispatch, IncomingMessages::MessageTransfer& transfer) { if (receiver->getName() == transfer.getDestination()) { transfer.retrieve(message); - if (isDispatch) { - qpid::sys::Mutex::ScopedUnlock u(lock); - qpid::messaging::MessageListener* listener = receiver->getListener(); - if (listener) listener->received(*message); - } receiver->received(*message); return true; } else { @@ -242,18 +228,6 @@ bool SessionImpl::accept(ReceiverImpl* receiver, } } -bool SessionImpl::acceptAny(qpid::messaging::Message* message, bool isDispatch, IncomingMessages::MessageTransfer& transfer) -{ - Receivers::iterator i = receivers.find(transfer.getDestination()); - if (i == receivers.end()) { - QPID_LOG(error, "Received message for unknown destination " << transfer.getDestination()); - return false; - } else { - boost::intrusive_ptr<ReceiverImpl> receiver = getImplPtr<Receiver, ReceiverImpl>(i->second); - return receiver && (!isDispatch || receiver->getListener()) && accept(receiver.get(), message, isDispatch, transfer); - } -} - bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout) { return incoming.get(handler, timeout); @@ -261,37 +235,10 @@ bool SessionImpl::getIncoming(IncomingMessages::Handler& handler, qpid::sys::Dur bool SessionImpl::get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout) { - IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, false, _1)); + IncomingMessageHandler handler(boost::bind(&SessionImpl::accept, this, &receiver, &message, _1)); return getIncoming(handler, timeout); } -bool SessionImpl::dispatch(qpid::sys::Duration timeout) -{ - qpid::sys::Mutex::ScopedLock l(lock); - while (true) { - try { - qpid::messaging::Message message; - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, true, _1)); - return getIncoming(handler, timeout); - } catch (TransportFailure&) { - reconnect(); - } - } -} - -bool SessionImpl::fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout) -{ - qpid::sys::Mutex::ScopedLock l(lock); - while (true) { - try { - IncomingMessageHandler handler(boost::bind(&SessionImpl::acceptAny, this, &message, false, _1)); - return getIncoming(handler, timeout); - } catch (TransportFailure&) { - reconnect(); - } - } -} - bool SessionImpl::nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout) { qpid::sys::Mutex::ScopedLock l(lock); @@ -418,13 +365,6 @@ void SessionImpl::rejectImpl(qpid::messaging::Message& m) session.messageReject(set); } -qpid::messaging::Message SessionImpl::fetch(qpid::sys::Duration timeout) -{ - qpid::messaging::Message result; - if (!fetch(result, timeout)) throw Receiver::NoMessageAvailable(); - return result; -} - void SessionImpl::receiverCancelled(const std::string& name) { receivers.erase(name); @@ -442,14 +382,4 @@ void SessionImpl::reconnect() connection.reconnect(); } -void* SessionImpl::getLastConfirmedSent() -{ - return 0; -} - -void* SessionImpl::getLastConfirmedAcknowledged() -{ - return 0; -} - }}} // namespace qpid::client::amqp0_10 diff --git a/cpp/src/qpid/client/amqp0_10/SessionImpl.h b/cpp/src/qpid/client/amqp0_10/SessionImpl.h index ec9a6162c1..f3018b9685 100644 --- a/cpp/src/qpid/client/amqp0_10/SessionImpl.h +++ b/cpp/src/qpid/client/amqp0_10/SessionImpl.h @@ -62,21 +62,12 @@ class SessionImpl : public qpid::messaging::SessionImpl void close(); void sync(); void flush(); - qpid::messaging::Address createTempQueue(const std::string& baseName); qpid::messaging::Sender createSender(const qpid::messaging::Address& address); qpid::messaging::Receiver createReceiver(const qpid::messaging::Address& address); - void* getLastConfirmedSent(); - void* getLastConfirmedAcknowledged(); - - bool fetch(qpid::messaging::Message& message, qpid::sys::Duration timeout); - qpid::messaging::Message fetch(qpid::sys::Duration timeout); - bool dispatch(qpid::sys::Duration timeout); - bool nextReceiver(qpid::messaging::Receiver& receiver, qpid::sys::Duration timeout); qpid::messaging::Receiver nextReceiver(qpid::sys::Duration timeout); - bool get(ReceiverImpl& receiver, qpid::messaging::Message& message, qpid::sys::Duration timeout); void receiverCancelled(const std::string& name); @@ -116,8 +107,7 @@ class SessionImpl : public qpid::messaging::SessionImpl Receivers receivers; Senders senders; - bool acceptAny(qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); - bool accept(ReceiverImpl*, qpid::messaging::Message*, bool, IncomingMessages::MessageTransfer&); + bool accept(ReceiverImpl*, qpid::messaging::Message*, IncomingMessages::MessageTransfer&); bool getIncoming(IncomingMessages::Handler& handler, qpid::sys::Duration timeout); bool getNextReceiver(qpid::messaging::Receiver* receiver, IncomingMessages::MessageTransfer& transfer); void reconnect(); |
