From 9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 Mon Sep 17 00:00:00 2001 From: Gordon Sim Date: Tue, 28 Aug 2007 19:38:17 +0000 Subject: Updated message.transfer encoding to use header and content segments (including new structs). Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68 --- cpp/src/qpid/client/ChannelHandler.cpp | 4 +- cpp/src/qpid/client/ClientChannel.cpp | 8 +-- cpp/src/qpid/client/ClientChannel.h | 2 +- cpp/src/qpid/client/ClientMessage.h | 16 +++++ cpp/src/qpid/client/ConnectionHandler.cpp | 2 +- cpp/src/qpid/client/ConnectionImpl.cpp | 2 +- cpp/src/qpid/client/Connector.cpp | 2 +- cpp/src/qpid/client/ExecutionHandler.cpp | 21 +++--- cpp/src/qpid/client/ExecutionHandler.h | 6 +- cpp/src/qpid/client/ReceivedContent.cpp | 105 ------------------------------ cpp/src/qpid/client/ReceivedContent.h | 75 --------------------- cpp/src/qpid/client/SessionCore.cpp | 2 +- cpp/src/qpid/client/SessionCore.h | 4 +- 13 files changed, 42 insertions(+), 207 deletions(-) delete mode 100644 cpp/src/qpid/client/ReceivedContent.cpp delete mode 100644 cpp/src/qpid/client/ReceivedContent.h (limited to 'cpp/src/qpid/client') diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp index b3d720baf0..754b0544c6 100644 --- a/cpp/src/qpid/client/ChannelHandler.cpp +++ b/cpp/src/qpid/client/ChannelHandler.cpp @@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id) id = _id; setState(OPENING); - AMQFrame f(version, id, ChannelOpenBody(version)); + AMQFrame f(id, ChannelOpenBody(version)); out(f); std::set states; @@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id) void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId) { setState(CLOSING); - AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId)); + AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId)); out(f); } diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp index d1cc4734eb..cc2b7aedc8 100644 --- a/cpp/src/qpid/client/ClientChannel.cpp +++ b/cpp/src/qpid/client/ClientChannel.cpp @@ -181,8 +181,8 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) { if (response.isA()) { return false; } else { - ReceivedContent::shared_ptr content = gets.pop(); - content->populate(msg); + FrameSet::shared_ptr content = gets.pop(); + msg.populate(*content); return true; } } @@ -232,13 +232,13 @@ void Channel::join() { void Channel::run() { try { while (true) { - ReceivedContent::shared_ptr content = session->get(); + FrameSet::shared_ptr content = session->get(); //need to dispatch this to the relevant listener: if (content->isA()) { ConsumerMap::iterator i = consumers.find(content->as()->getConsumerTag()); if (i != consumers.end()) { Message msg; - content->populate(msg); + msg.populate(*content); i->second.listener->received(msg); } else { QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod()); diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h index d73addc950..c355fe007a 100644 --- a/cpp/src/qpid/client/ClientChannel.h +++ b/cpp/src/qpid/client/ClientChannel.h @@ -83,7 +83,7 @@ class Channel : private sys::Runnable std::auto_ptr session; SessionCore::shared_ptr sessionCore; framing::ChannelId channelId; - BlockingQueue gets; + BlockingQueue gets; framing::Uuid uniqueId; uint32_t nameCounter; diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h index fd33fbc830..19b0f867bc 100644 --- a/cpp/src/qpid/client/ClientMessage.h +++ b/cpp/src/qpid/client/ClientMessage.h @@ -23,8 +23,13 @@ */ #include #include "qpid/framing/BasicHeaderProperties.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" +#include "qpid/framing/BasicDeliverBody.h" +#include "qpid/framing/BasicGetOkBody.h" +#include "qpid/framing/MessageTransferBody.h" + namespace qpid { namespace client { @@ -55,6 +60,17 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon const HeaderProperties& getMethodHeaders() const { return *this; } + + //TODO: move this elsewhere (GRS 24/08/2007) + void populate(framing::FrameSet& frameset) + { + const BasicHeaderProperties* properties = frameset.getHeaders()->get(); + if (properties) { + BasicHeaderProperties::copy(*this, *properties); + } + frameset.getContent(data); + } + private: std::string data; std::string destination; diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp index 66db9384e2..40e13593ea 100644 --- a/cpp/src/qpid/client/ConnectionHandler.cpp +++ b/cpp/src/qpid/client/ConnectionHandler.cpp @@ -109,7 +109,7 @@ void ConnectionHandler::close() void ConnectionHandler::send(const framing::AMQBody& body) { - AMQFrame f(ProtocolVersion(), 0, body); + AMQFrame f(0, body); out(f); } diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp index e63ac69da6..b4d2156c31 100644 --- a/cpp/src/qpid/client/ConnectionImpl.cpp +++ b/cpp/src/qpid/client/ConnectionImpl.cpp @@ -107,7 +107,7 @@ void ConnectionImpl::idleIn() void ConnectionImpl::idleOut() { - AMQFrame frame(version, 0, new AMQHeartbeatBody()); + AMQFrame frame(0, new AMQHeartbeatBody()); connector->send(frame); } diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp index b25f19e4ba..6e12a9c84f 100644 --- a/cpp/src/qpid/client/Connector.cpp +++ b/cpp/src/qpid/client/Connector.cpp @@ -180,7 +180,7 @@ void Connector::run(){ inbuf.move(received); inbuf.flip();//position = 0, limit = total data read - AMQFrame frame(version); + AMQFrame frame; while(frame.decode(inbuf)){ QPID_LOG(trace, "RECV: " << frame); input->received(frame); diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp index 6c2600d00b..d10b3d3fe8 100644 --- a/cpp/src/qpid/client/ExecutionHandler.cpp +++ b/cpp/src/qpid/client/ExecutionHandler.cpp @@ -64,9 +64,9 @@ void ExecutionHandler::handle(AMQFrame& frame) if (!invoke(body, this)) { if (isContentFrame(frame)) { if (!arriving) { - arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm)); + arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm)); } - arriving->append(body); + arriving->append(frame); if (arriving->isComplete()) { received.push(arriving); arriving.reset(); @@ -123,7 +123,7 @@ void ExecutionHandler::sync() void ExecutionHandler::sendFlush() { - AMQFrame frame(version, 0, ExecutionFlushBody()); + AMQFrame frame(0, ExecutionFlushBody()); out(frame); } @@ -139,8 +139,7 @@ void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener correlation.listen(g); } - AMQFrame frame(version, 0/*id will be filled in be channel handler*/, - command); + AMQFrame frame(0/*id will be filled in be channel handler*/, command); out(frame); } @@ -149,10 +148,10 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp { send(command, f, g); - AMQHeaderBody header(BASIC); - BasicHeaderProperties::copy(*static_cast(header.getProperties()), headers); - header.setContentSize(data.size()); - AMQFrame h(version, 0, header); + AMQHeaderBody header; + BasicHeaderProperties::copy(*header.get(true), headers); + header.get(true)->setContentLength(data.size()); + AMQFrame h(0, header); out(h); u_int64_t data_length = data.length(); @@ -160,7 +159,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp //frame itself uses 8 bytes u_int32_t frag_size = maxFrameSize - 8; if(data_length < frag_size){ - AMQFrame frame(version, 0, AMQContentBody(data)); + AMQFrame frame(0, AMQContentBody(data)); out(frame); }else{ u_int32_t offset = 0; @@ -168,7 +167,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp while (remaining > 0) { u_int32_t length = remaining > frag_size ? frag_size : remaining; string frag(data.substr(offset, length)); - AMQFrame frame(version, 0, AMQContentBody(frag)); + AMQFrame frame(0, AMQContentBody(frag)); out(frame); offset += length; remaining = data_length - offset; diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h index b409d5df7b..f740e14185 100644 --- a/cpp/src/qpid/client/ExecutionHandler.h +++ b/cpp/src/qpid/client/ExecutionHandler.h @@ -23,12 +23,12 @@ #include #include "qpid/framing/AMQP_ServerOperations.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/SequenceNumber.h" #include "BlockingQueue.h" #include "ChainableFrameHandler.h" #include "CompletionTracker.h" #include "Correlator.h" -#include "ReceivedContent.h" namespace qpid { namespace client { @@ -39,7 +39,7 @@ class ExecutionHandler : { framing::Window incoming; framing::Window outgoing; - ReceivedContent::shared_ptr arriving; + framing::FrameSet::shared_ptr arriving; Correlator correlation; CompletionTracker completion; framing::ProtocolVersion version; @@ -52,7 +52,7 @@ class ExecutionHandler : void sync(); public: - BlockingQueue received; + BlockingQueue received; ExecutionHandler(uint64_t maxFrameSize = 65536); diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp deleted file mode 100644 index 5a1f48901a..0000000000 --- a/cpp/src/qpid/client/ReceivedContent.cpp +++ /dev/null @@ -1,105 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -#include "ReceivedContent.h" -#include "qpid/framing/all_method_bodies.h" - -using qpid::client::ReceivedContent; -using namespace qpid::framing; -using namespace boost; - -ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {} - -void ReceivedContent::append(AMQBody* part) -{ - parts.push_back(AMQFrame(ProtocolVersion(), 0, part)); -} - -bool ReceivedContent::isComplete() const -{ - if (parts.empty()) { - return false; - } else if (isA() || isA()) { - const AMQHeaderBody* headers(getHeaders()); - return headers && headers->getContentSize() == getContentSize(); - } else if (isA()) { - //no longer support references, headers and data are still method fields - return true; - } else { - throw Exception("Unknown content class"); - } -} - - -const AMQMethodBody* ReceivedContent::getMethod() const -{ - return parts.empty() ? 0 : dynamic_cast(parts[0].getBody()); -} - -const AMQHeaderBody* ReceivedContent::getHeaders() const -{ - return parts.size() < 2 ? 0 : dynamic_cast(parts[1].getBody()); -} - -uint64_t ReceivedContent::getContentSize() const -{ - if (isA() || isA()) { - uint64_t size(0); - for (uint i = 2; i < parts.size(); i++) { - size += parts[i].getBody()->size(); - } - return size; - } else if (isA()) { - return as()->getBody().getValue().size(); - } else { - throw Exception("Unknown content class"); - } -} - -std::string ReceivedContent::getContent() const -{ - if (isA() || isA()) { - string data; - for (uint i = 2; i < parts.size(); i++) { - data += static_cast(parts[i].getBody())->getData(); - } - return data; - } else if (isA()) { - return as()->getBody().getValue(); - } else { - throw Exception("Unknown content class"); - } -} - -void ReceivedContent::populate(Message& msg) -{ - if (!isComplete()) throw Exception("Incomplete message"); - - if (isA() || isA()) { - const BasicHeaderProperties* properties = dynamic_cast(getHeaders()->getProperties()); - BasicHeaderProperties::copy(msg, *properties); - msg.setData(getContent()); - } else if (isA()) { - throw Exception("Transfer not yet supported"); - } else { - throw Exception("Unknown content class"); - } -} diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h deleted file mode 100644 index 4f84039c10..0000000000 --- a/cpp/src/qpid/client/ReceivedContent.h +++ /dev/null @@ -1,75 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -#include -#include -#include "qpid/framing/amqp_framing.h" -#include "qpid/framing/AMQFrame.h" -#include "qpid/framing/SequenceNumber.h" -#include "ClientMessage.h" - -#ifndef _ReceivedContent_ -#define _ReceivedContent_ - -namespace qpid { -namespace client { - -/** - * Collects the frames representing some received 'content'. This - * provides a raw interface to 'message' data and attributes. - */ -class ReceivedContent -{ - const framing::SequenceNumber id; - std::vector parts; - -public: - typedef boost::shared_ptr shared_ptr; - - ReceivedContent(const framing::SequenceNumber& id); - void append(framing::AMQBody* part); - bool isComplete() const; - - uint64_t getContentSize() const; - std::string getContent() const; - - const framing::AMQMethodBody* getMethod() const; - const framing::AMQHeaderBody* getHeaders() const; - - template bool isA() const { - const framing::AMQMethodBody* method=getMethod(); - return method && method->isA(); - } - - template const T* as() const { - const framing::AMQMethodBody* method=getMethod(); - return (method && method->isA()) ? dynamic_cast(method) : 0; - } - - const framing::SequenceNumber& getId() const { return id; } - - void populate(Message& msg); -}; - -} -} - - -#endif diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp index f7ed7416cd..1b04e74af4 100644 --- a/cpp/src/qpid/client/SessionCore.cpp +++ b/cpp/src/qpid/client/SessionCore.cpp @@ -77,7 +77,7 @@ Response SessionCore::send(const AMQMethodBody& method, const MethodContent& con return Response(f); } -ReceivedContent::shared_ptr SessionCore::get() +FrameSet::shared_ptr SessionCore::get() { return l3.received.pop(); } diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h index bcbaf0028d..0febb956b9 100644 --- a/cpp/src/qpid/client/SessionCore.h +++ b/cpp/src/qpid/client/SessionCore.h @@ -25,11 +25,11 @@ #include #include "qpid/framing/AMQMethodBody.h" #include "qpid/framing/FrameHandler.h" +#include "qpid/framing/FrameSet.h" #include "qpid/framing/MethodContent.h" #include "ChannelHandler.h" #include "ExecutionHandler.h" #include "FutureFactory.h" -#include "ReceivedContent.h" #include "Response.h" namespace qpid { @@ -49,7 +49,7 @@ public: SessionCore(uint16_t id, boost::shared_ptr out, uint64_t maxFrameSize); Response send(const framing::AMQMethodBody& method, bool expectResponse = false); Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false); - ReceivedContent::shared_ptr get(); + framing::FrameSet::shared_ptr get(); uint16_t getId() const { return id; } void setSync(bool); bool isSync(); -- cgit v1.2.1