From 70da584bd2e48bd56320f7ca1f6e94dfa430596d Mon Sep 17 00:00:00 2001 From: Alan Conway Date: Thu, 5 Apr 2007 21:23:14 +0000 Subject: * cpp/src/broker/BrokerMessageMessage.h: Change reference from weak_ptr to shared_ptr. Broker messages hold their reference. * cpp/src/broker/Reference.cpp (close): clear messages array to break shared_ptr cycle and avoid a leak. * cpp/src/client/MessageMessageChannel.cpp (publish): Support references for large messages. * cpp/src/shared_ptr.h (make_shared_ptr): added deleter variant. * cpp/src/tests/ClientChannelTest.cpp: Enabled testGetNoContent, testGetFragmentedMessage git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@525964 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/broker/BrokerMessageMessage.cpp | 2 +- cpp/src/broker/BrokerMessageMessage.h | 2 +- cpp/src/broker/Reference.cpp | 1 + cpp/src/client/MessageMessageChannel.cpp | 43 +++++++++++++++++++++++++++++--- cpp/src/shared_ptr.h | 5 ++++ cpp/src/tests/ClientChannelTest.cpp | 2 ++ 6 files changed, 49 insertions(+), 6 deletions(-) diff --git a/cpp/src/broker/BrokerMessageMessage.cpp b/cpp/src/broker/BrokerMessageMessage.cpp index d8eb0fada0..e34cd61f6c 100644 --- a/cpp/src/broker/BrokerMessageMessage.cpp +++ b/cpp/src/broker/BrokerMessageMessage.cpp @@ -312,7 +312,7 @@ MessageTransferBody* MessageMessage::copyTransfer(const ProtocolVersion& version } MessageMessage::ReferencePtr MessageMessage::getReference() const { - return reference.lock(); + return reference; } diff --git a/cpp/src/broker/BrokerMessageMessage.h b/cpp/src/broker/BrokerMessageMessage.h index 31962d5879..976b882c7e 100644 --- a/cpp/src/broker/BrokerMessageMessage.h +++ b/cpp/src/broker/BrokerMessageMessage.h @@ -93,7 +93,7 @@ class MessageMessage: public Message{ framing::RequestId requestId; const TransferPtr transfer; - const boost::weak_ptr reference; + const boost::shared_ptr reference; }; }} diff --git a/cpp/src/broker/Reference.cpp b/cpp/src/broker/Reference.cpp index ef55d3e6a2..1ef2eb44d0 100644 --- a/cpp/src/broker/Reference.cpp +++ b/cpp/src/broker/Reference.cpp @@ -46,6 +46,7 @@ void Reference::append(AppendPtr ptr) { } void Reference::close() { + messages.clear(); registry->references.erase(getId()); } diff --git a/cpp/src/client/MessageMessageChannel.cpp b/cpp/src/client/MessageMessageChannel.cpp index 164a1cb426..8d0fdc3189 100644 --- a/cpp/src/client/MessageMessageChannel.cpp +++ b/cpp/src/client/MessageMessageChannel.cpp @@ -192,6 +192,17 @@ MessageTransferBody::shared_ptr makeTransfer( )); } +// FIXME aconway 2007-04-05: Generated code should provide this. +/** + * Calculate the size of a frame containing the given body type + * if all variable-lengths parts are empty. + */ +template size_t overhead() { + static AMQFrame frame( + ProtocolVersion(), 0, make_shared_ptr(new T(ProtocolVersion()))); + return frame.size(); +} + void MessageMessageChannel::publish( const Message& msg, const Exchange& exchange, const std::string& routingKey, bool mandatory, bool immediate) @@ -201,11 +212,35 @@ void MessageMessageChannel::publish( msg, exchange.getName(), routingKey, mandatory, immediate); // Frame itself uses 8 bytes. u_int32_t frameMax = channel.connection->getMaxFrameSize() - 8; - if (transfer->size() > frameMax) { - // FIXME aconway 2007-02-23: - throw QPID_ERROR(INTERNAL_ERROR, "References not yet implemented"); + if (transfer->size() <= frameMax) { + channel.sendAndReceive(transfer); + } + else { + std::string ref = newTag(); + std::string data = transfer->getBody().getValue(); + size_t chunk = + channel.connection->getMaxFrameSize() - + (overhead() + ref.size()); + // TODO aconway 2007-04-05: cast around lack of generated setters + const_cast(transfer->getBody()) = Content(REFERENCE,ref); + channel.send( + make_shared_ptr(new MessageOpenBody(channel.version, ref))); + channel.send(transfer); + const char* p = data.data(); + const char* end = data.data()+data.size(); + while (p+chunk <= end) { + channel.send( + make_shared_ptr( + new MessageAppendBody(channel.version, ref, std::string(p, chunk)))); + p += chunk; + } + if (p < end) { + channel.send( + make_shared_ptr( + new MessageAppendBody(channel.version, ref, std::string(p, end-p)))); + } + channel.send(make_shared_ptr(new MessageCloseBody(channel.version, ref))); } - channel.sendAndReceive(transfer); } void copy(Message& msg, MessageTransferBody& transfer) { diff --git a/cpp/src/shared_ptr.h b/cpp/src/shared_ptr.h index df08c325df..eb5f3f906a 100644 --- a/cpp/src/shared_ptr.h +++ b/cpp/src/shared_ptr.h @@ -37,6 +37,11 @@ template shared_ptr make_shared_ptr(T* ptr) { return shared_ptr(ptr); } +template +shared_ptr make_shared_ptr(T* ptr, D deleter) { + return shared_ptr(ptr, deleter); +} + } // namespace qpid diff --git a/cpp/src/tests/ClientChannelTest.cpp b/cpp/src/tests/ClientChannelTest.cpp index 8dc3e4b432..18c71527b6 100644 --- a/cpp/src/tests/ClientChannelTest.cpp +++ b/cpp/src/tests/ClientChannelTest.cpp @@ -207,6 +207,8 @@ class BasicClientChannelTest : public ClientChannelTestBase { class MessageClientChannelTest : public ClientChannelTestBase { CPPUNIT_TEST_SUITE(MessageClientChannelTest); CPPUNIT_TEST(testPublishGet); + CPPUNIT_TEST(testGetNoContent); + CPPUNIT_TEST(testGetFragmentedMessage); CPPUNIT_TEST_SUITE_END(); public: MessageClientChannelTest() { -- cgit v1.2.1