summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid/client/amqp0_10/SenderImpl.cpp')
-rw-r--r--cpp/src/qpid/client/amqp0_10/SenderImpl.cpp36
1 files changed, 30 insertions, 6 deletions
diff --git a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
index e70ee8af6f..c619d1226a 100644
--- a/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
+++ b/cpp/src/qpid/client/amqp0_10/SenderImpl.cpp
@@ -22,6 +22,7 @@
#include "MessageSink.h"
#include "SessionImpl.h"
#include "AddressResolution.h"
+#include "OutgoingMessage.h"
namespace qpid {
namespace client {
@@ -30,9 +31,10 @@ namespace amqp0_10 {
SenderImpl::SenderImpl(SessionImpl& _parent, const std::string& _name,
const qpid::messaging::Address& _address,
const qpid::messaging::Variant::Map& _options) :
- parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED) {}
+ parent(_parent), name(_name), address(_address), options(_options), state(UNRESOLVED),
+ capacity(50), window(0) {}
-void SenderImpl::send(qpid::messaging::Message& m)
+void SenderImpl::send(const qpid::messaging::Message& m)
{
execute1<Send>(&m);
}
@@ -54,14 +56,36 @@ void SenderImpl::init(qpid::client::AsyncSession s, AddressResolution& resolver)
parent.senderCancelled(name);
} else {
sink->declare(session, name);
- //TODO: replay
+ replay();
}
}
-void SenderImpl::sendImpl(qpid::messaging::Message& m)
+void SenderImpl::sendImpl(const qpid::messaging::Message& m)
{
- //TODO: record for replay if appropriate
- sink->send(session, name, m);
+ //TODO: make recoding for replay optional
+ std::auto_ptr<OutgoingMessage> msg(new OutgoingMessage());
+ msg->convert(m);
+ outgoing.push_back(msg.release());
+ sink->send(session, name, outgoing.back());
+ if (++window > (capacity / 2)) {//TODO: make this configurable?
+ session.flush();
+ checkPendingSends();
+ window = 0;
+ }
+}
+
+void SenderImpl::replay()
+{
+ for (OutgoingMessages::iterator i = outgoing.begin(); i != outgoing.end(); ++i) {
+ sink->send(session, name, *i);
+ }
+}
+
+void SenderImpl::checkPendingSends()
+{
+ while (!outgoing.empty() && outgoing.front().status.isComplete()) {
+ outgoing.pop_front();
+ }
}
void SenderImpl::cancelImpl()