diff options
Diffstat (limited to 'cpp/lib/client/ClientChannel.cpp')
| -rw-r--r-- | cpp/lib/client/ClientChannel.cpp | 16 |
1 files changed, 12 insertions, 4 deletions
diff --git a/cpp/lib/client/ClientChannel.cpp b/cpp/lib/client/ClientChannel.cpp index a97d79dcf9..92f8ae63ca 100644 --- a/cpp/lib/client/ClientChannel.cpp +++ b/cpp/lib/client/ClientChannel.cpp @@ -23,6 +23,7 @@ #include <ClientMessage.h> #include <QpidError.h> #include <MethodBodyInstances.h> +#include <framing/FrameList.h> using namespace boost; //to use dynamic_pointer_cast using namespace qpid::client; @@ -219,19 +220,25 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& string e = exchange.getName(); string key = routingKey; - out->send(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); + std::auto_ptr<FrameList> message(new FrameList()); + + message->add(new AMQFrame(version, id, new BasicPublishBody(version, 0, e, key, mandatory, immediate))); //break msg up into header frame and content frame(s) and send these string data = msg.getData(); msg.header->setContentSize(data.length()); AMQBody::shared_ptr body(static_pointer_cast<AMQBody, AMQHeaderBody>(msg.header)); - out->send(new AMQFrame(version, id, body)); + message->add(new AMQFrame(version, id, body)); u_int64_t data_length = data.length(); if(data_length > 0){ u_int32_t frag_size = con->getMaxFrameSize() - 8;//frame itself uses 8 bytes - if(data_length < frag_size){ - out->send(new AMQFrame(version, id, new AMQContentBody(data))); + if(data_length + message->size() < frag_size){ + message->add(new AMQFrame(version, id, new AMQContentBody(data))); + } else if(data_length < frag_size){ + out->send(message.release()); + out->send(new AMQFrame(version, id, new AMQContentBody(data))); }else{ + out->send(message.release()); u_int32_t offset = 0; u_int32_t remaining = data_length - offset; while (remaining > 0) { @@ -244,6 +251,7 @@ void Channel::publish(Message& msg, const Exchange& exchange, const std::string& } } } + if (message.get()) out->send(message.release()); } void Channel::commit(){ |
