summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/client
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
committerGordon Sim <gsim@apache.org>2007-08-28 19:38:17 +0000
commit9e10f4ea3b2f8ab6650f635cada48e4735ca20d7 (patch)
tree26ad3b8dffa17fa665fe7a033a7c8092839df011 /cpp/src/qpid/client
parent6b09696b216c090b512c6af92bf7976ae3407add (diff)
downloadqpid-python-9e10f4ea3b2f8ab6650f635cada48e4735ca20d7.tar.gz
Updated message.transfer encoding to use header and content segments (including new structs).
Unified more between the basic and message classes messages. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@570538 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/client')
-rw-r--r--cpp/src/qpid/client/ChannelHandler.cpp4
-rw-r--r--cpp/src/qpid/client/ClientChannel.cpp8
-rw-r--r--cpp/src/qpid/client/ClientChannel.h2
-rw-r--r--cpp/src/qpid/client/ClientMessage.h16
-rw-r--r--cpp/src/qpid/client/ConnectionHandler.cpp2
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/Connector.cpp2
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.cpp21
-rw-r--r--cpp/src/qpid/client/ExecutionHandler.h6
-rw-r--r--cpp/src/qpid/client/ReceivedContent.cpp105
-rw-r--r--cpp/src/qpid/client/ReceivedContent.h75
-rw-r--r--cpp/src/qpid/client/SessionCore.cpp2
-rw-r--r--cpp/src/qpid/client/SessionCore.h4
13 files changed, 42 insertions, 207 deletions
diff --git a/cpp/src/qpid/client/ChannelHandler.cpp b/cpp/src/qpid/client/ChannelHandler.cpp
index b3d720baf0..754b0544c6 100644
--- a/cpp/src/qpid/client/ChannelHandler.cpp
+++ b/cpp/src/qpid/client/ChannelHandler.cpp
@@ -75,7 +75,7 @@ void ChannelHandler::open(uint16_t _id)
id = _id;
setState(OPENING);
- AMQFrame f(version, id, ChannelOpenBody(version));
+ AMQFrame f(id, ChannelOpenBody(version));
out(f);
std::set<int> states;
@@ -90,7 +90,7 @@ void ChannelHandler::open(uint16_t _id)
void ChannelHandler::close(uint16_t code, const std::string& message, uint16_t classId, uint16_t methodId)
{
setState(CLOSING);
- AMQFrame f(version, id, ChannelCloseBody(version, code, message, classId, methodId));
+ AMQFrame f(id, ChannelCloseBody(version, code, message, classId, methodId));
out(f);
}
diff --git a/cpp/src/qpid/client/ClientChannel.cpp b/cpp/src/qpid/client/ClientChannel.cpp
index d1cc4734eb..cc2b7aedc8 100644
--- a/cpp/src/qpid/client/ClientChannel.cpp
+++ b/cpp/src/qpid/client/ClientChannel.cpp
@@ -181,8 +181,8 @@ bool Channel::get(Message& msg, const Queue& queue, AckMode ackMode) {
if (response.isA<BasicGetEmptyBody>()) {
return false;
} else {
- ReceivedContent::shared_ptr content = gets.pop();
- content->populate(msg);
+ FrameSet::shared_ptr content = gets.pop();
+ msg.populate(*content);
return true;
}
}
@@ -232,13 +232,13 @@ void Channel::join() {
void Channel::run() {
try {
while (true) {
- ReceivedContent::shared_ptr content = session->get();
+ FrameSet::shared_ptr content = session->get();
//need to dispatch this to the relevant listener:
if (content->isA<BasicDeliverBody>()) {
ConsumerMap::iterator i = consumers.find(content->as<BasicDeliverBody>()->getConsumerTag());
if (i != consumers.end()) {
Message msg;
- content->populate(msg);
+ msg.populate(*content);
i->second.listener->received(msg);
} else {
QPID_LOG(warning, "Dropping message for unrecognised consumer: " << content->getMethod());
diff --git a/cpp/src/qpid/client/ClientChannel.h b/cpp/src/qpid/client/ClientChannel.h
index d73addc950..c355fe007a 100644
--- a/cpp/src/qpid/client/ClientChannel.h
+++ b/cpp/src/qpid/client/ClientChannel.h
@@ -83,7 +83,7 @@ class Channel : private sys::Runnable
std::auto_ptr<Session> session;
SessionCore::shared_ptr sessionCore;
framing::ChannelId channelId;
- BlockingQueue<ReceivedContent::shared_ptr> gets;
+ BlockingQueue<framing::FrameSet::shared_ptr> gets;
framing::Uuid uniqueId;
uint32_t nameCounter;
diff --git a/cpp/src/qpid/client/ClientMessage.h b/cpp/src/qpid/client/ClientMessage.h
index fd33fbc830..19b0f867bc 100644
--- a/cpp/src/qpid/client/ClientMessage.h
+++ b/cpp/src/qpid/client/ClientMessage.h
@@ -23,8 +23,13 @@
*/
#include <string>
#include "qpid/framing/BasicHeaderProperties.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
+#include "qpid/framing/BasicDeliverBody.h"
+#include "qpid/framing/BasicGetOkBody.h"
+#include "qpid/framing/MessageTransferBody.h"
+
namespace qpid {
namespace client {
@@ -55,6 +60,17 @@ class Message : public framing::BasicHeaderProperties, public framing::MethodCon
const HeaderProperties& getMethodHeaders() const { return *this; }
+
+ //TODO: move this elsewhere (GRS 24/08/2007)
+ void populate(framing::FrameSet& frameset)
+ {
+ const BasicHeaderProperties* properties = frameset.getHeaders()->get<BasicHeaderProperties>();
+ if (properties) {
+ BasicHeaderProperties::copy<Message, BasicHeaderProperties>(*this, *properties);
+ }
+ frameset.getContent(data);
+ }
+
private:
std::string data;
std::string destination;
diff --git a/cpp/src/qpid/client/ConnectionHandler.cpp b/cpp/src/qpid/client/ConnectionHandler.cpp
index 66db9384e2..40e13593ea 100644
--- a/cpp/src/qpid/client/ConnectionHandler.cpp
+++ b/cpp/src/qpid/client/ConnectionHandler.cpp
@@ -109,7 +109,7 @@ void ConnectionHandler::close()
void ConnectionHandler::send(const framing::AMQBody& body)
{
- AMQFrame f(ProtocolVersion(), 0, body);
+ AMQFrame f(0, body);
out(f);
}
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index e63ac69da6..b4d2156c31 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -107,7 +107,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
- AMQFrame frame(version, 0, new AMQHeartbeatBody());
+ AMQFrame frame(0, new AMQHeartbeatBody());
connector->send(frame);
}
diff --git a/cpp/src/qpid/client/Connector.cpp b/cpp/src/qpid/client/Connector.cpp
index b25f19e4ba..6e12a9c84f 100644
--- a/cpp/src/qpid/client/Connector.cpp
+++ b/cpp/src/qpid/client/Connector.cpp
@@ -180,7 +180,7 @@ void Connector::run(){
inbuf.move(received);
inbuf.flip();//position = 0, limit = total data read
- AMQFrame frame(version);
+ AMQFrame frame;
while(frame.decode(inbuf)){
QPID_LOG(trace, "RECV: " << frame);
input->received(frame);
diff --git a/cpp/src/qpid/client/ExecutionHandler.cpp b/cpp/src/qpid/client/ExecutionHandler.cpp
index 6c2600d00b..d10b3d3fe8 100644
--- a/cpp/src/qpid/client/ExecutionHandler.cpp
+++ b/cpp/src/qpid/client/ExecutionHandler.cpp
@@ -64,9 +64,9 @@ void ExecutionHandler::handle(AMQFrame& frame)
if (!invoke(body, this)) {
if (isContentFrame(frame)) {
if (!arriving) {
- arriving = ReceivedContent::shared_ptr(new ReceivedContent(++incoming.hwm));
+ arriving = FrameSet::shared_ptr(new FrameSet(++incoming.hwm));
}
- arriving->append(body);
+ arriving->append(frame);
if (arriving->isComplete()) {
received.push(arriving);
arriving.reset();
@@ -123,7 +123,7 @@ void ExecutionHandler::sync()
void ExecutionHandler::sendFlush()
{
- AMQFrame frame(version, 0, ExecutionFlushBody());
+ AMQFrame frame(0, ExecutionFlushBody());
out(frame);
}
@@ -139,8 +139,7 @@ void ExecutionHandler::send(const AMQBody& command, CompletionTracker::Listener
correlation.listen(g);
}
- AMQFrame frame(version, 0/*id will be filled in be channel handler*/,
- command);
+ AMQFrame frame(0/*id will be filled in be channel handler*/, command);
out(frame);
}
@@ -149,10 +148,10 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
{
send(command, f, g);
- AMQHeaderBody header(BASIC);
- BasicHeaderProperties::copy(*static_cast<BasicHeaderProperties*>(header.getProperties()), headers);
- header.setContentSize(data.size());
- AMQFrame h(version, 0, header);
+ AMQHeaderBody header;
+ BasicHeaderProperties::copy(*header.get<BasicHeaderProperties>(true), headers);
+ header.get<BasicHeaderProperties>(true)->setContentLength(data.size());
+ AMQFrame h(0, header);
out(h);
u_int64_t data_length = data.length();
@@ -160,7 +159,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
//frame itself uses 8 bytes
u_int32_t frag_size = maxFrameSize - 8;
if(data_length < frag_size){
- AMQFrame frame(version, 0, AMQContentBody(data));
+ AMQFrame frame(0, AMQContentBody(data));
out(frame);
}else{
u_int32_t offset = 0;
@@ -168,7 +167,7 @@ void ExecutionHandler::sendContent(const AMQBody& command, const BasicHeaderProp
while (remaining > 0) {
u_int32_t length = remaining > frag_size ? frag_size : remaining;
string frag(data.substr(offset, length));
- AMQFrame frame(version, 0, AMQContentBody(frag));
+ AMQFrame frame(0, AMQContentBody(frag));
out(frame);
offset += length;
remaining = data_length - offset;
diff --git a/cpp/src/qpid/client/ExecutionHandler.h b/cpp/src/qpid/client/ExecutionHandler.h
index b409d5df7b..f740e14185 100644
--- a/cpp/src/qpid/client/ExecutionHandler.h
+++ b/cpp/src/qpid/client/ExecutionHandler.h
@@ -23,12 +23,12 @@
#include <queue>
#include "qpid/framing/AMQP_ServerOperations.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/SequenceNumber.h"
#include "BlockingQueue.h"
#include "ChainableFrameHandler.h"
#include "CompletionTracker.h"
#include "Correlator.h"
-#include "ReceivedContent.h"
namespace qpid {
namespace client {
@@ -39,7 +39,7 @@ class ExecutionHandler :
{
framing::Window incoming;
framing::Window outgoing;
- ReceivedContent::shared_ptr arriving;
+ framing::FrameSet::shared_ptr arriving;
Correlator correlation;
CompletionTracker completion;
framing::ProtocolVersion version;
@@ -52,7 +52,7 @@ class ExecutionHandler :
void sync();
public:
- BlockingQueue<ReceivedContent::shared_ptr> received;
+ BlockingQueue<framing::FrameSet::shared_ptr> received;
ExecutionHandler(uint64_t maxFrameSize = 65536);
diff --git a/cpp/src/qpid/client/ReceivedContent.cpp b/cpp/src/qpid/client/ReceivedContent.cpp
deleted file mode 100644
index 5a1f48901a..0000000000
--- a/cpp/src/qpid/client/ReceivedContent.cpp
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "ReceivedContent.h"
-#include "qpid/framing/all_method_bodies.h"
-
-using qpid::client::ReceivedContent;
-using namespace qpid::framing;
-using namespace boost;
-
-ReceivedContent::ReceivedContent(const SequenceNumber& _id) : id(_id) {}
-
-void ReceivedContent::append(AMQBody* part)
-{
- parts.push_back(AMQFrame(ProtocolVersion(), 0, part));
-}
-
-bool ReceivedContent::isComplete() const
-{
- if (parts.empty()) {
- return false;
- } else if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const AMQHeaderBody* headers(getHeaders());
- return headers && headers->getContentSize() == getContentSize();
- } else if (isA<MessageTransferBody>()) {
- //no longer support references, headers and data are still method fields
- return true;
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-
-const AMQMethodBody* ReceivedContent::getMethod() const
-{
- return parts.empty() ? 0 : dynamic_cast<const AMQMethodBody*>(parts[0].getBody());
-}
-
-const AMQHeaderBody* ReceivedContent::getHeaders() const
-{
- return parts.size() < 2 ? 0 : dynamic_cast<const AMQHeaderBody*>(parts[1].getBody());
-}
-
-uint64_t ReceivedContent::getContentSize() const
-{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- uint64_t size(0);
- for (uint i = 2; i < parts.size(); i++) {
- size += parts[i].getBody()->size();
- }
- return size;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue().size();
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-std::string ReceivedContent::getContent() const
-{
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- string data;
- for (uint i = 2; i < parts.size(); i++) {
- data += static_cast<const AMQContentBody*>(parts[i].getBody())->getData();
- }
- return data;
- } else if (isA<MessageTransferBody>()) {
- return as<MessageTransferBody>()->getBody().getValue();
- } else {
- throw Exception("Unknown content class");
- }
-}
-
-void ReceivedContent::populate(Message& msg)
-{
- if (!isComplete()) throw Exception("Incomplete message");
-
- if (isA<BasicDeliverBody>() || isA<BasicGetOkBody>()) {
- const BasicHeaderProperties* properties = dynamic_cast<const BasicHeaderProperties*>(getHeaders()->getProperties());
- BasicHeaderProperties::copy<Message, BasicHeaderProperties>(msg, *properties);
- msg.setData(getContent());
- } else if (isA<MessageTransferBody>()) {
- throw Exception("Transfer not yet supported");
- } else {
- throw Exception("Unknown content class");
- }
-}
diff --git a/cpp/src/qpid/client/ReceivedContent.h b/cpp/src/qpid/client/ReceivedContent.h
deleted file mode 100644
index 4f84039c10..0000000000
--- a/cpp/src/qpid/client/ReceivedContent.h
+++ /dev/null
@@ -1,75 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include <string>
-#include <vector>
-#include "qpid/framing/amqp_framing.h"
-#include "qpid/framing/AMQFrame.h"
-#include "qpid/framing/SequenceNumber.h"
-#include "ClientMessage.h"
-
-#ifndef _ReceivedContent_
-#define _ReceivedContent_
-
-namespace qpid {
-namespace client {
-
-/**
- * Collects the frames representing some received 'content'. This
- * provides a raw interface to 'message' data and attributes.
- */
-class ReceivedContent
-{
- const framing::SequenceNumber id;
- std::vector<framing::AMQFrame> parts;
-
-public:
- typedef boost::shared_ptr<ReceivedContent> shared_ptr;
-
- ReceivedContent(const framing::SequenceNumber& id);
- void append(framing::AMQBody* part);
- bool isComplete() const;
-
- uint64_t getContentSize() const;
- std::string getContent() const;
-
- const framing::AMQMethodBody* getMethod() const;
- const framing::AMQHeaderBody* getHeaders() const;
-
- template <class T> bool isA() const {
- const framing::AMQMethodBody* method=getMethod();
- return method && method->isA<T>();
- }
-
- template <class T> const T* as() const {
- const framing::AMQMethodBody* method=getMethod();
- return (method && method->isA<T>()) ? dynamic_cast<const T*>(method) : 0;
- }
-
- const framing::SequenceNumber& getId() const { return id; }
-
- void populate(Message& msg);
-};
-
-}
-}
-
-
-#endif
diff --git a/cpp/src/qpid/client/SessionCore.cpp b/cpp/src/qpid/client/SessionCore.cpp
index f7ed7416cd..1b04e74af4 100644
--- a/cpp/src/qpid/client/SessionCore.cpp
+++ b/cpp/src/qpid/client/SessionCore.cpp
@@ -77,7 +77,7 @@ Response SessionCore::send(const AMQMethodBody& method, const MethodContent& con
return Response(f);
}
-ReceivedContent::shared_ptr SessionCore::get()
+FrameSet::shared_ptr SessionCore::get()
{
return l3.received.pop();
}
diff --git a/cpp/src/qpid/client/SessionCore.h b/cpp/src/qpid/client/SessionCore.h
index bcbaf0028d..0febb956b9 100644
--- a/cpp/src/qpid/client/SessionCore.h
+++ b/cpp/src/qpid/client/SessionCore.h
@@ -25,11 +25,11 @@
#include <boost/shared_ptr.hpp>
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/FrameHandler.h"
+#include "qpid/framing/FrameSet.h"
#include "qpid/framing/MethodContent.h"
#include "ChannelHandler.h"
#include "ExecutionHandler.h"
#include "FutureFactory.h"
-#include "ReceivedContent.h"
#include "Response.h"
namespace qpid {
@@ -49,7 +49,7 @@ public:
SessionCore(uint16_t id, boost::shared_ptr<framing::FrameHandler> out, uint64_t maxFrameSize);
Response send(const framing::AMQMethodBody& method, bool expectResponse = false);
Response send(const framing::AMQMethodBody& method, const framing::MethodContent& content, bool expectResponse = false);
- ReceivedContent::shared_ptr get();
+ framing::FrameSet::shared_ptr get();
uint16_t getId() const { return id; }
void setSync(bool);
bool isSync();