diff options
Diffstat (limited to 'cpp/src')
| -rw-r--r-- | cpp/src/qpid/sys/AsynchIOAcceptor.cpp | 60 |
1 files changed, 27 insertions, 33 deletions
diff --git a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp index bf4a3ff842..8d6b543dc9 100644 --- a/cpp/src/qpid/sys/AsynchIOAcceptor.cpp +++ b/cpp/src/qpid/sys/AsynchIOAcceptor.cpp @@ -190,7 +190,7 @@ void AsynchIOHandler::send(framing::AMQFrame& frame) { if (!frameQueueClosed) frameQueue.push(frame); } - + // Activate aio for writing here aio->queueWrite(); } @@ -263,46 +263,40 @@ void AsynchIOHandler::idle(AsynchIO&){ return; } - // Try and get a queued buffer if not then construct new one - AsynchIO::Buffer* buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - std::auto_ptr<framing::Buffer> out(new framing::Buffer(buff->bytes, buff->byteCount)); - int buffUsed = 0; - - while (!frameQueue.empty()) { + do { + // Try and get a queued buffer if not then construct new one + AsynchIO::Buffer* buff = aio->getQueuedBuffer(); + if (!buff) + buff = new Buff; + framing::Buffer out(buff->bytes, buff->byteCount); + int buffUsed = 0; + framing::AMQFrame frame = frameQueue.front(); - frameQueue.pop(); - - // Encode output frame int frameSize = frame.size(); + while (frameSize <= int(out.available())) { + frameQueue.pop(); + + // Encode output frame + frame.encode(out); + buffUsed += frameSize; + QPID_LOG(debug, "SENT: " << frame); + + if (frameQueue.empty()) + break; + frame = frameQueue.front(); + frameSize = frame.size(); + } + // If frame was egregiously large complain if (frameSize > buff->byteCount) THROW_QPID_ERROR(FRAMING_ERROR, "Could not write frame, too large for buffer."); - - // If we've filled current buffer then flush and get new one - if (frameSize > int(out->available())) { - buff->dataCount = buffUsed; - aio->queueWrite(buff); - - buff = aio->getQueuedBuffer(); - if (!buff) - buff = new Buff; - out.reset(new framing::Buffer(buff->bytes, buff->byteCount)); - buffUsed = 0; - } - - frame.encode(*out); - buffUsed += frameSize; - QPID_LOG(debug, "SENT: " << frame); - } - - buff->dataCount = buffUsed; - aio->queueWrite(buff); + + buff->dataCount = buffUsed; + aio->queueWrite(buff); + } while (!frameQueue.empty()); if (frameQueueClosed) { aio->queueWriteClose(); } - } }} // namespace qpid::sys |
