diff options
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SenderImpl.cpp')
| -rw-r--r-- | cpp/src/qpid/client/amqp0_10/SenderImpl.cpp | 36 |
1 files changed, 30 insertions, 6 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp index e70ee8af6f..c619d1226a 100644 --- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp +++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp @@ -22,6 +22,7 @@ #include "MessageSink.h" #include "SessionImpl.h" #include "AddressResolution.h" +#include "OutgoingMessage.h" namespace qpid { namespace client { @@ -30,9 +31,10 @@ namespace amqp0_10 { SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name, const qpid::messaging::Address& _address, const qpid::messaging::Variant::Map& _options) : - parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {} + parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED), + capacity(50), window(0) {} -void SenderImpl::send(qpid::messaging::Message& m) +void SenderImpl::send(const qpid::messaging::Message& m) { execute1<Send>(&m); } @@ -54,14 +56,36 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver) parent.senderCancelled(name); } else { sink->declare(session, name); - //TODO: replay + replay(); } } -void SenderImpl::sendImpl(qpid::messaging::Message& m) +void SenderImpl::sendImpl(const qpid::messaging::Message& m) { - //TODO: record for replay if appropriate - sink->send(session, name, m); + //TODO: make recoding for replay optional + std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage()); + msg->convert(m); + outgoing.push_back(msg.release()); + sink->send(session, name, outgoing.back()); + if (++window > (capacity / 2)) {//TODO: make this configurable? + session.flush(); + checkPendingSends(); + window = 0; + } +} + +void SenderImpl::replay() +{ + for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) { + sink->send(session, name, *i); + } +} + +void SenderImpl::checkPendingSends() +{ + while (!outgoing.empty() && outgoing.front().status.isComplete()) { + outgoing.pop_front(); + } } void SenderImpl::cancelImpl() |
