/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. */ #include #include #include "qpid/client/AsyncSession.h" #include "qpid/framing/FrameSet.h" #include "qpid/framing/AMQFrame.h" #include "MessageBodyStream.h" namespace Apache { namespace Qpid { namespace Interop { using namespace System; using namespace System::Runtime::InteropServices; using namespace System::Threading; using namespace msclr; using namespace qpid::client; using namespace qpid::framing; // Thefolowing def must match "Frames" private typedef. // TODO: make "Frames" publicly visible. typedef qpid::InlineVector FrameSetFrames; using namespace std; static void ThrowIfBadArgs (array^ buffer, int offset, int count) { if (buffer == nullptr) throw gcnew ArgumentNullException("buffer"); if (offset < 0) throw gcnew ArgumentOutOfRangeException("offset"); if (count < 0) throw gcnew ArgumentOutOfRangeException("count"); if ((offset + count) > buffer->Length) throw gcnew ArgumentException("offset + count"); } // Input stream constructor MessageBodyStream::MessageBodyStream(FrameSet::shared_ptr *fspp) { isInputStream = true; frameSetpp = fspp; fragmentCount = 0; length = 0; position = 0; currentFramep = NULL; const std::string *datap; // pointer to the fragment's string variable that holds the content for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { if (i->getBody()->type() == CONTENT_BODY) { fragmentCount++; datap = &(i->castBody()->getData()); length += datap->size(); } } // fragmentCount can be zero for an empty message fragmentIndex = 0; fragmentPosition = 0; if (fragmentCount == 0) { currentFragment = NULL; fragmentLength = 0; } else if (fragmentCount == 1) { currentFragment = datap->data(); fragmentLength = (int) length; } else { fragments = gcnew array(fragmentCount); fragmentIndex = 0; for(FrameSetFrames::const_iterator i = (*frameSetpp)->begin(); i != (*frameSetpp)->end(); i++) { if (i->getBody()->type() == CONTENT_BODY) { datap = &(i->castBody()->getData()); fragments[fragmentIndex++] = (IntPtr) (void *) datap; } } fragmentIndex = 0; datap = (const std::string *) fragments[0].ToPointer(); currentFragment = datap->data(); fragmentLength = datap->size(); } } int MessageBodyStream::Read(array^ buffer, int offset, int count) { if (!isInputStream) throw gcnew NotSupportedException(); if (disposed) throw gcnew ObjectDisposedException("Stream"); if (count == 0) return 0; ThrowIfBadArgs(buffer, offset, count); int nRead = 0; int remaining = count; while (nRead < count) { int fragAvail = fragmentLength - fragmentPosition; int copyCount = min (fragAvail, remaining); if (copyCount == 0) { // no more to read return nRead; } // copy from native space IntPtr nativep = (IntPtr) (void *) (currentFragment + fragmentPosition); Marshal::Copy (nativep, buffer, offset, copyCount); nRead += copyCount; remaining -= copyCount; fragmentPosition += copyCount; offset += copyCount; // advance to next fragment? if (fragmentPosition == fragmentLength) { if (++fragmentIndex < fragmentCount) { const std::string *datap = (const std::string *) fragments[fragmentIndex].ToPointer(); currentFragment = datap->data(); fragmentLength = datap->size(); fragmentPosition = 0; } } } return nRead; } void MessageBodyStream::pushCurrentFrame(bool lastFrame) { // set flags as in SessionImpl::sendContent. if (currentFramep->getBody()->type() == CONTENT_BODY) { if ((fragmentCount == 1) && lastFrame) { // only one content frame currentFramep->setFirstSegment(false); } else { currentFramep->setFirstSegment(false); currentFramep->setLastSegment(true); if (fragmentCount != 1) { currentFramep->setFirstFrame(false); } if (!lastFrame) { currentFramep->setLastFrame(false); } } } else { // the header frame currentFramep->setFirstSegment(false); if (!lastFrame) { // there will be at least one content frame currentFramep->setLastSegment(false); } } // add to frame set. This makes a copy and ref counts the body (*frameSetpp)->append(*currentFramep); delete currentFramep; currentFramep = NULL; } IntPtr MessageBodyStream::GetFrameSet() { if (currentFramep != NULL) { // No more content. Tidy up the pending (possibly single header) frame. pushCurrentFrame(true); } if (frameSetpp == NULL) { return (IntPtr) NULL; } // shared_ptr.get() return (IntPtr) (void *) (*frameSetpp).get(); } IntPtr MessageBodyStream::GetHeader() { return (IntPtr) headerBodyp; } // Ouput stream constructor MessageBodyStream::MessageBodyStream(int maxFrameSize) { isInputStream = false; maxFrameContentSize = maxFrameSize - AMQFrame::frameOverhead(); SequenceNumber unused; // only meaningful on incoming frames frameSetpp = new FrameSet::shared_ptr(new FrameSet(unused)); fragmentCount = 0; length = 0; position = 0; // header goes first in the outgoing frameset boost::intrusive_ptr headerBody(new AMQHeaderBody); currentFramep = new AMQFrame(headerBody); headerBodyp = static_cast(headerBody.get()); // mark this header frame as "full" to force the first write to create a new content frame fragmentPosition = maxFrameContentSize; } void MessageBodyStream::Write(array^ buffer, int offset, int count) { if (isInputStream) throw gcnew NotSupportedException(); if (disposed) throw gcnew ObjectDisposedException("Stream"); if (count == 0) return; ThrowIfBadArgs(buffer, offset, count); if (currentFramep == NULL) { // GetFrameSet() has been called and we no longer exclusively own the underlying frames. throw gcnew InvalidOperationException ("Mesage Body output already completed"); } if (count <= 0) return; // keep GC memory movement at bay while copying to native space pin_ptr pinnedBuf = &buffer[0]; string *datap; int remaining = count; while (remaining > 0) { if (fragmentPosition == maxFrameContentSize) { // move to a new frame, but not until ready to add new content. // zero content is valid, or the final write may exactly fill to maxFrameContentSize pushCurrentFrame(false); currentFramep = new AMQFrame(AMQContentBody()); fragmentPosition = 0; fragmentCount++; } int copyCount = min (remaining, (maxFrameContentSize - fragmentPosition)); datap = &(currentFramep->castBody()->getData()); char *outp = (char *) pinnedBuf + offset; if (fragmentPosition == 0) { datap->assign(outp, copyCount); } else { datap->append(outp, copyCount); } position += copyCount; fragmentPosition += copyCount; remaining -= copyCount; offset += copyCount; } } void MessageBodyStream::Cleanup() { { lock l(this); if (disposed) return; disposed = true; } try {} finally { if (frameSetpp != NULL) { delete frameSetpp; frameSetpp = NULL; } if (currentFramep != NULL) { delete currentFramep; currentFramep = NULL; } } } MessageBodyStream::~MessageBodyStream() { Cleanup(); } MessageBodyStream::!MessageBodyStream() { Cleanup(); } void MessageBodyStream::Close() { // Simulate Dispose()... Cleanup(); GC::SuppressFinalize(this); } }}} // namespace Apache::Qpid::Interop