summaryrefslogtreecommitdiff
path: root/cpp/src/qpid
diff options
context:
space:
mode:
Diffstat (limited to 'cpp/src/qpid')
-rw-r--r--cpp/src/qpid/RefCounted.h4
-rw-r--r--cpp/src/qpid/broker/DeliveryRecord.cpp2
-rw-r--r--cpp/src/qpid/broker/Message.cpp5
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp3
-rw-r--r--cpp/src/qpid/broker/SessionState.cpp3
-rw-r--r--cpp/src/qpid/client/ConnectionImpl.cpp2
-rw-r--r--cpp/src/qpid/client/SessionImpl.cpp4
-rw-r--r--cpp/src/qpid/cluster/Connection.cpp2
-rw-r--r--cpp/src/qpid/framing/AMQBody.h13
-rw-r--r--cpp/src/qpid/framing/AMQContentBody.h1
-rw-r--r--cpp/src/qpid/framing/AMQFrame.cpp45
-rw-r--r--cpp/src/qpid/framing/AMQFrame.h31
-rw-r--r--cpp/src/qpid/framing/AMQHeaderBody.h2
-rw-r--r--cpp/src/qpid/framing/AMQHeartbeatBody.h1
-rw-r--r--cpp/src/qpid/framing/Array.cpp1
-rw-r--r--cpp/src/qpid/framing/Blob.h197
-rw-r--r--cpp/src/qpid/framing/BodyFactory.h47
-rw-r--r--cpp/src/qpid/framing/BodyHolder.cpp76
-rw-r--r--cpp/src/qpid/framing/BodyHolder.h88
-rw-r--r--cpp/src/qpid/framing/FieldTable.cpp1
-rw-r--r--cpp/src/qpid/framing/MethodBodyFactory.h44
-rw-r--r--cpp/src/qpid/framing/SendContent.cpp3
-rw-r--r--cpp/src/qpid/framing/SequenceNumberSet.cpp1
-rw-r--r--cpp/src/qpid/framing/SequenceSet.cpp1
-rw-r--r--cpp/src/qpid/management/ManagementBroker.cpp7
-rw-r--r--cpp/src/qpid/replication/ReplicatingEventListener.cpp6
-rw-r--r--cpp/src/qpid/sys/Thread.h4
27 files changed, 173 insertions, 421 deletions
diff --git a/cpp/src/qpid/RefCounted.h b/cpp/src/qpid/RefCounted.h
index 10b5e4afcc..e7a9bf31c1 100644
--- a/cpp/src/qpid/RefCounted.h
+++ b/cpp/src/qpid/RefCounted.h
@@ -39,11 +39,13 @@ class RefCounted : boost::noncopyable {
public:
RefCounted() : count(0) {}
void addRef() const { ++count; }
- void release() const { if (--count==0) delete this; }
+ void release() const { if (--count==0) released(); }
long refCount() { return count; }
protected:
virtual ~RefCounted() {};
+ // Allow subclasses to over-ride behavior when refcount reaches 0.
+ virtual void released() const { delete this; }
};
diff --git a/cpp/src/qpid/broker/DeliveryRecord.cpp b/cpp/src/qpid/broker/DeliveryRecord.cpp
index 90ec67f477..432a44ec63 100644
--- a/cpp/src/qpid/broker/DeliveryRecord.cpp
+++ b/cpp/src/qpid/broker/DeliveryRecord.cpp
@@ -93,7 +93,7 @@ void DeliveryRecord::deliver(framing::FrameHandler& h, DeliveryId deliveryId, ui
msg.payload->getProperties<framing::DeliveryProperties>()->setRedelivered(true);
}
- framing::AMQFrame method(framing::in_place<framing::MessageTransferBody>(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1));
+ framing::AMQFrame method((framing::MessageTransferBody(framing::ProtocolVersion(), tag, acceptExpected ? 0 : 1, acquired ? 0 : 1)));
method.setEof(false);
h.handle(method);
msg.payload->sendHeader(h, framesize);
diff --git a/cpp/src/qpid/broker/Message.cpp b/cpp/src/qpid/broker/Message.cpp
index 89f2653a21..6bcee99f49 100644
--- a/cpp/src/qpid/broker/Message.cpp
+++ b/cpp/src/qpid/broker/Message.cpp
@@ -157,8 +157,7 @@ void Message::decodeContent(framing::Buffer& buffer)
if (buffer.available()) {
//get the data as a string and set that as the content
//body on a frame then add that frame to the frameset
- AMQFrame frame;
- frame.setBody(AMQContentBody());
+ AMQFrame frame((AMQContentBody()));
frame.castBody<AMQContentBody>()->decode(buffer, buffer.available());
frames.append(frame);
} else {
@@ -208,7 +207,7 @@ void Message::sendContent(Queue& queue, framing::FrameHandler& out, uint16_t max
bool done = false;
for (uint64_t offset = 0; !done; offset += maxContentSize)
{
- AMQFrame frame(in_place<AMQContentBody>());
+ AMQFrame frame((AMQContentBody()));
string& data = frame.castBody<AMQContentBody>()->getData();
store->loadContent(queue, pmsg, data, offset, maxContentSize);
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 8f0e3344d5..1e988021b2 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -49,8 +49,7 @@ void MessageBuilder::handle(AMQFrame& frame)
if (type == CONTENT_BODY) {
//TODO: rethink how to handle non-existent headers(?)...
//didn't get a header: add in a dummy
- AMQFrame header;
- header.setBody(AMQHeaderBody());
+ AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
message->getFrames().append(header);
diff --git a/cpp/src/qpid/broker/SessionState.cpp b/cpp/src/qpid/broker/SessionState.cpp
index d0804d66b9..e17a813db7 100644
--- a/cpp/src/qpid/broker/SessionState.cpp
+++ b/cpp/src/qpid/broker/SessionState.cpp
@@ -188,8 +188,7 @@ void SessionState::handleContent(AMQFrame& frame, const SequenceNumber& id)
if (frame.getEof() && frame.getEos()) {//end of frameset
if (frame.getBof()) {
//i.e this is a just a command frame, add a dummy header
- AMQFrame header;
- header.setBody(AMQHeaderBody());
+ AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(false);
msg->getFrames().append(header);
diff --git a/cpp/src/qpid/client/ConnectionImpl.cpp b/cpp/src/qpid/client/ConnectionImpl.cpp
index d0e47b774f..8e27d78479 100644
--- a/cpp/src/qpid/client/ConnectionImpl.cpp
+++ b/cpp/src/qpid/client/ConnectionImpl.cpp
@@ -164,7 +164,7 @@ void ConnectionImpl::idleIn()
void ConnectionImpl::idleOut()
{
- AMQFrame frame(in_place<AMQHeartbeatBody>());
+ AMQFrame frame((AMQHeartbeatBody()));
connector->send(frame);
}
diff --git a/cpp/src/qpid/client/SessionImpl.cpp b/cpp/src/qpid/client/SessionImpl.cpp
index ab8c1bddb8..4fadf236f8 100644
--- a/cpp/src/qpid/client/SessionImpl.cpp
+++ b/cpp/src/qpid/client/SessionImpl.cpp
@@ -364,7 +364,7 @@ void SessionImpl::sendContent(const MethodContent& content)
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
if(data_length < frag_size){
- AMQFrame frame(in_place<AMQContentBody>(content.getData()));
+ AMQFrame frame((AMQContentBody(content.getData())));
frame.setFirstSegment(false);
handleOut(frame);
}else{
@@ -373,7 +373,7 @@ void SessionImpl::sendContent(const MethodContent& content)
while (remaining > 0) {
uint32_t length = remaining > frag_size ? frag_size : remaining;
string frag(content.getData().substr(offset, length));
- AMQFrame frame(in_place<AMQContentBody>(frag));
+ AMQFrame frame((AMQContentBody(frag)));
frame.setFirstSegment(false);
frame.setLastSegment(true);
if (offset > 0) {
diff --git a/cpp/src/qpid/cluster/Connection.cpp b/cpp/src/qpid/cluster/Connection.cpp
index a255acfc1f..3988abd491 100644
--- a/cpp/src/qpid/cluster/Connection.cpp
+++ b/cpp/src/qpid/cluster/Connection.cpp
@@ -109,7 +109,7 @@ void Connection::received(framing::AMQFrame& f) {
QPID_LOG(debug, cluster << " inserting connection " << *this);
cluster.insert(boost::intrusive_ptr<Connection>(this));
}
- AMQFrame ok(in_place<ConnectionCloseOkBody>());
+ AMQFrame ok((ConnectionCloseOkBody()));
connection.getOutput().send(ok);
output.setOutputHandler(discardHandler);
catchUp = false;
diff --git a/cpp/src/qpid/framing/AMQBody.h b/cpp/src/qpid/framing/AMQBody.h
index 93f4319575..9e66b9738f 100644
--- a/cpp/src/qpid/framing/AMQBody.h
+++ b/cpp/src/qpid/framing/AMQBody.h
@@ -22,7 +22,9 @@
*
*/
#include "qpid/framing/amqp_types.h"
-
+#include "qpid/RefCounted.h"
+#include "qpid/framing/BodyFactory.h"
+#include <boost/intrusive_ptr.hpp>
#include <ostream>
namespace qpid {
@@ -43,11 +45,15 @@ struct AMQBodyConstVisitor {
virtual void visit(const AMQMethodBody&) = 0;
};
-class AMQBody
-{
+class AMQBody : public RefCounted {
public:
+ AMQBody() {}
virtual ~AMQBody();
+ // Make AMQBody copyable even though RefCounted.
+ AMQBody(const AMQBody&) : RefCounted() {}
+ AMQBody& operator=(const AMQBody&) { return *this; }
+
virtual uint8_t type() const = 0;
virtual void encode(Buffer& buffer) const = 0;
@@ -62,6 +68,7 @@ class AMQBody
/** Match if same type and same class/method ID for methods */
static bool match(const AMQBody& , const AMQBody& );
+ virtual boost::intrusive_ptr<AMQBody> clone() const = 0;
};
std::ostream& operator<<(std::ostream& out, const AMQBody& body) ;
diff --git a/cpp/src/qpid/framing/AMQContentBody.h b/cpp/src/qpid/framing/AMQContentBody.h
index 288f1549a9..a81cf36168 100644
--- a/cpp/src/qpid/framing/AMQContentBody.h
+++ b/cpp/src/qpid/framing/AMQContentBody.h
@@ -44,6 +44,7 @@ public:
void decode(Buffer& buffer, uint32_t size);
void print(std::ostream& out) const;
void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+ boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
}
diff --git a/cpp/src/qpid/framing/AMQFrame.cpp b/cpp/src/qpid/framing/AMQFrame.cpp
index 98a1354811..7ef0db08cb 100644
--- a/cpp/src/qpid/framing/AMQFrame.cpp
+++ b/cpp/src/qpid/framing/AMQFrame.cpp
@@ -20,25 +20,31 @@
*/
#include "AMQFrame.h"
-#include "qpid/framing/variant.h"
#include "qpid/framing/AMQMethodBody.h"
#include "qpid/framing/reply_exceptions.h"
-
+#include "qpid/framing/BodyFactory.h"
+#include "qpid/framing/MethodBodyFactory.h"
#include <boost/format.hpp>
-
#include <iostream>
namespace qpid {
namespace framing {
-AMQFrame::~AMQFrame() {}
+void AMQFrame::init() { bof = eof = bos = eos = true; subchannel=0; channel=0; }
+
+AMQFrame::AMQFrame(const boost::intrusive_ptr<AMQBody>& b) : body(b) { init(); }
+
+AMQFrame::AMQFrame(const AMQBody& b) : body(b.clone()) { init(); }
-void AMQFrame::setBody(const AMQBody& b) { body = new BodyHolder(b); }
+AMQFrame::~AMQFrame() {}
-void AMQFrame::setMethod(ClassId c, MethodId m) { body = new BodyHolder(c,m); }
+void AMQFrame::setMethod(ClassId c, MethodId m) { body = MethodBodyFactory::create(c,m); }
uint32_t AMQFrame::encodedSize() const {
- return frameOverhead() + body->encodedSize();
+ uint32_t size = frameOverhead() + body->encodedSize();
+ if (body->getMethod())
+ size += sizeof(ClassId)+sizeof(MethodId);
+ return size;
}
uint32_t AMQFrame::frameOverhead() {
@@ -65,6 +71,11 @@ void AMQFrame::encode(Buffer& buffer) const
buffer.putOctet(0x0f & track);
buffer.putShort(channel);
buffer.putLong(0);
+ const AMQMethodBody* method=getMethod();
+ if (method) {
+ buffer.putOctet(method->amqpClassId());
+ buffer.putOctet(method->amqpMethodId());
+ }
body->encode(buffer);
}
@@ -105,8 +116,24 @@ bool AMQFrame::decode(Buffer& buffer)
buffer.restore();
return false;
}
- body = new BodyHolder();
- body->decode(type,buffer, body_size);
+
+ switch(type)
+ {
+ case 0://CONTROL
+ case METHOD_BODY: {
+ ClassId c = buffer.getOctet();
+ MethodId m = buffer.getOctet();
+ body = MethodBodyFactory::create(c, m);
+ break;
+ }
+ case HEADER_BODY: body = BodyFactory::create<AMQHeaderBody>(); break;
+ case CONTENT_BODY: body = BodyFactory::create<AMQContentBody>(); break;
+ case HEARTBEAT_BODY: body = BodyFactory::create<AMQHeartbeatBody>(); break;
+ default:
+ throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type));
+ }
+ body->decode(buffer, body_size);
+
return true;
}
diff --git a/cpp/src/qpid/framing/AMQFrame.h b/cpp/src/qpid/framing/AMQFrame.h
index 7bf4638089..f1ea2f8b82 100644
--- a/cpp/src/qpid/framing/AMQFrame.h
+++ b/cpp/src/qpid/framing/AMQFrame.h
@@ -26,47 +26,29 @@
#include "AMQContentBody.h"
#include "AMQHeartbeatBody.h"
#include "ProtocolVersion.h"
-#include "BodyHolder.h"
#include "qpid/sys/LatencyMetric.h"
-
#include <boost/intrusive_ptr.hpp>
#include <boost/cast.hpp>
namespace qpid {
namespace framing {
-class BodyHolder;
-
class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
{
public:
- AMQFrame(boost::intrusive_ptr<BodyHolder> b=0) : body(b) { init(); }
- AMQFrame(const AMQBody& b) { setBody(b); init(); }
+ AMQFrame(const boost::intrusive_ptr<AMQBody>& b=0);
+ AMQFrame(const AMQBody& b);
~AMQFrame();
- template <class InPlace>
- AMQFrame(const InPlace& ip, typename EnableInPlace<InPlace>::type* =0) {
- init(); setBody(ip);
- }
-
ChannelId getChannel() const { return channel; }
void setChannel(ChannelId c) { channel = c; }
- boost::intrusive_ptr<BodyHolder> getHolder() { return body; }
-
- AMQBody* getBody() { return body ? body->get() : 0; }
- const AMQBody* getBody() const { return body ? body->get() : 0; }
+ AMQBody* getBody() { return body.get(); }
+ const AMQBody* getBody() const { return body.get(); }
AMQMethodBody* getMethod() { return getBody()->getMethod(); }
const AMQMethodBody* getMethod() const { return getBody()->getMethod(); }
- void setBody(const AMQBody& b);
-
- template <class InPlace>
- typename EnableInPlace<InPlace>::type setBody(const InPlace& ip) {
- body = new BodyHolder(ip);
- }
-
void setMethod(ClassId c, MethodId m);
template <class T> T* castBody() {
@@ -109,10 +91,11 @@ class AMQFrame : public AMQDataBlock, public sys::LatencyMetricTimestamp
static uint32_t frameOverhead();
/** Must point to at least DECODE_SIZE_MIN bytes of data */
static uint16_t decodeSize(char* data);
+
private:
- void init() { bof = eof = bos = eos = true; subchannel=0; channel=0; }
+ void init();
- boost::intrusive_ptr<BodyHolder> body;
+ boost::intrusive_ptr<AMQBody> body;
uint16_t channel : 16;
uint8_t subchannel : 8;
bool bof : 1;
diff --git a/cpp/src/qpid/framing/AMQHeaderBody.h b/cpp/src/qpid/framing/AMQHeaderBody.h
index 7ddb7d89cf..9846544949 100644
--- a/cpp/src/qpid/framing/AMQHeaderBody.h
+++ b/cpp/src/qpid/framing/AMQHeaderBody.h
@@ -99,6 +99,8 @@ public:
template <class T> const T* get() const {
return properties.OptProps<T>::props.get_ptr();
}
+
+ boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
}}
diff --git a/cpp/src/qpid/framing/AMQHeartbeatBody.h b/cpp/src/qpid/framing/AMQHeartbeatBody.h
index 7b42b46e5c..3fb41c128e 100644
--- a/cpp/src/qpid/framing/AMQHeartbeatBody.h
+++ b/cpp/src/qpid/framing/AMQHeartbeatBody.h
@@ -38,6 +38,7 @@ public:
inline void decode(Buffer& , uint32_t /*size*/) {}
virtual void print(std::ostream& out) const;
void accept(AMQBodyConstVisitor& v) const { v.visit(*this); }
+ boost::intrusive_ptr<AMQBody> clone() const { return BodyFactory::copy(*this); }
};
}
diff --git a/cpp/src/qpid/framing/Array.cpp b/cpp/src/qpid/framing/Array.cpp
index 9f072f7b05..a4eb5a29ab 100644
--- a/cpp/src/qpid/framing/Array.cpp
+++ b/cpp/src/qpid/framing/Array.cpp
@@ -75,6 +75,7 @@ void Array::encode(Buffer& buffer) const{
}
void Array::decode(Buffer& buffer){
+ values.clear();
uint32_t size = buffer.getLong();//size added only when array is a top-level type
uint32_t available = buffer.available();
if (available < size) {
diff --git a/cpp/src/qpid/framing/Blob.h b/cpp/src/qpid/framing/Blob.h
index 5c84384ad7..e69de29bb2 100644
--- a/cpp/src/qpid/framing/Blob.h
+++ b/cpp/src/qpid/framing/Blob.h
@@ -1,197 +0,0 @@
-#ifndef QPID_FRAMING_BLOB_H
-#define QPID_FRAMING_BLOB_H
-
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include <boost/static_assert.hpp>
-#include <boost/aligned_storage.hpp>
-#include <boost/checked_delete.hpp>
-#include <boost/utility/typed_in_place_factory.hpp>
-#include <boost/type_traits/is_base_and_derived.hpp>
-#include <boost/utility/enable_if.hpp>
-#include <boost/version.hpp>
-
-#include <new>
-
-#include <assert.h>
-
-
-namespace qpid {
-namespace framing {
-
-using boost::in_place;
-using boost::typed_in_place_factory_base;
-
-/** 0-arg typed_in_place_factory, missing in pre-1.35 boost. */
-#if (BOOST_VERSION < 103500)
-template <class T>
-struct typed_in_place_factory0 : public typed_in_place_factory_base {
- typedef T value_type ;
- void apply ( void* address ) const { new (address) T(); }
-};
-
-/** 0-arg in_place<T>() function, missing from boost. */
-template<class T>
-typed_in_place_factory0<T> in_place() { return typed_in_place_factory0<T>(); }
-#endif
-
-template <class T, class R=void>
-struct EnableInPlace
- : public boost::enable_if<boost::is_base_and_derived<
- typed_in_place_factory_base, T>,
- R>
-{};
-
-template <class T, class R=void>
-struct DisableInPlace
- : public boost::disable_if<boost::is_base_and_derived<
- typed_in_place_factory_base, T>,
- R>
-{};
-
-template <class T> struct BlobHelper {
- static void destroy(void* ptr) { static_cast<T*>(ptr)->~T(); }
- static void copy(void* dest, const void* src) {
- new (dest) T(*static_cast<const T*>(src));
- }
-};
-
-template <> struct BlobHelper<void> {
- static void destroy(void*);
- static void copy(void* to, const void* from);
-};
-
-/**
- * A Blob is a chunk of memory which can contain a single object at
- * a time-arbitrary type, provided sizeof(T)<=blob.size(). Using Blobs
- * ensures proper construction and destruction of its contents,
- * and proper copying between Blobs, but nothing else.
- *
- * In particular you must ensure that the Blob is big enough for its
- * contents and must know the type of object in the Blob to cast get().
- *
- * If BaseType is specified then only an object that can be
- * static_cast to BaseType may be stored in the Blob.
- */
-template <size_t Size, class BaseType=void>
-class Blob
-{
- boost::aligned_storage<Size> store;
- BaseType* basePtr;
-
- void (*destroy)(void*);
- void (*copy)(void*, const void*);
-
- template <class T>void setType() {
- BOOST_STATIC_ASSERT(sizeof(T) <= Size);
- destroy=&BlobHelper<T>::destroy;
- copy=&BlobHelper<T>::copy;
- // Base pointer may be offeset from store.address()
- basePtr = reinterpret_cast<T*>(store.address());
- }
-
- void initialize() {
- destroy=&BlobHelper<void>::destroy;
- copy=&BlobHelper<void>::copy;
- basePtr=0;
- }
-
- template<class Factory>
- typename EnableInPlace<Factory>::type apply(const Factory& factory)
- {
- typedef typename Factory::value_type T;
- assert(empty());
- factory.apply(store.address());
- setType<T>();
- }
-
- void assign(const Blob& b) {
- assert(empty());
- if (b.empty()) return;
- b.copy(this->store.address(), b.store.address());
- copy = b.copy;
- destroy = b.destroy;
- basePtr = reinterpret_cast<BaseType*>(
- ((char*)this)+ ((const char*)(b.basePtr) - (const char*)(&b)));
- }
-
- public:
- /** Construct an empty Blob. */
- Blob() { initialize(); }
-
- /** Copy a Blob. */
- Blob(const Blob& b) { initialize(); assign(b); }
-
- /** Construct from in_place constructor. */
- template<class InPlace>
- Blob(const InPlace & expr, typename EnableInPlace<InPlace>::type* =0) {
- initialize(); apply(expr);
- }
-
- /** Construct by copying an object constructor. */
- template<class T>
- Blob(const T & t, typename DisableInPlace<T>::type* =0) {
- initialize(); apply(in_place<T>(t));
- }
-
- ~Blob() { clear(); }
-
- /** Assign from another Blob. */
- Blob& operator=(const Blob& b) {
- clear();
- assign(b);
- return *this;
- }
-
- /** Assign from an in_place constructor expression. */
- template<class InPlace>
- typename EnableInPlace<InPlace,Blob&>::type operator=(const InPlace& expr) {
- clear(); apply(expr); return *this;
- }
-
- /** Assign from an object of type T. */
- template <class T>
- typename DisableInPlace<T, Blob&>::type operator=(const T& x) {
- clear(); apply(in_place<T>(x)); return *this;
- }
-
- /** Get pointer to Blob contents, returns 0 if empty. */
- BaseType* get() { return basePtr; }
-
- /** Get pointer to Blob contents, returns 0 if empty. */
- const BaseType* get() const { return basePtr; }
-
- /** Destroy the object in the Blob making it empty. */
- void clear() {
- void (*oldDestroy)(void*) = destroy;
- initialize();
- oldDestroy(store.address());
- }
-
- bool empty() const { return destroy==BlobHelper<void>::destroy; }
-
- static size_t size() { return Size; }
-};
-
-}} // namespace qpid::framing
-
-
-#endif /*!QPID_FRAMING_BLOB_H*/
diff --git a/cpp/src/qpid/framing/BodyFactory.h b/cpp/src/qpid/framing/BodyFactory.h
new file mode 100644
index 0000000000..6a8d9b1988
--- /dev/null
+++ b/cpp/src/qpid/framing/BodyFactory.h
@@ -0,0 +1,47 @@
+#ifndef QPID_FRAMING_BODYFACTORY_H
+#define QPID_FRAMING_BODYFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace framing {
+
+/**
+ * Indirect creation of body types to allow centralized changes to
+ * memory management strategy.
+ */
+class BodyFactory {
+ public:
+ template <class BodyType> static boost::intrusive_ptr<BodyType> create() {
+ return new BodyType;
+ }
+
+ template <class BodyType> static boost::intrusive_ptr<BodyType> copy(const BodyType& body) {
+ return new BodyType(body);
+ }
+};
+
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_BODYFACTORY_H*/
diff --git a/cpp/src/qpid/framing/BodyHolder.cpp b/cpp/src/qpid/framing/BodyHolder.cpp
index b30a52a38e..e69de29bb2 100644
--- a/cpp/src/qpid/framing/BodyHolder.cpp
+++ b/cpp/src/qpid/framing/BodyHolder.cpp
@@ -1,76 +0,0 @@
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-#include "BodyHolder.h"
-#include "AMQMethodBody.h"
-#include "AMQHeaderBody.h"
-#include "AMQContentBody.h"
-#include "AMQHeartbeatBody.h"
-#include "Buffer.h"
-#include "qpid/framing/reply_exceptions.h"
-
-namespace qpid {
-namespace framing {
-
-
-// BodyHolder::operator=(const AMQBody&) is defined
-// in generated file BodyHolder_gen.cpp
-
-
-void BodyHolder::encode(Buffer& b) const {
- const AMQMethodBody* method=getMethod();
- if (method) {
- b.putOctet(method->amqpClassId());
- b.putOctet(method->amqpMethodId());
- method->encode(b);
- }
- else
- get()->encode(b);
-}
-
-void BodyHolder::decode(uint8_t type, Buffer& buffer, uint32_t size) {
- switch(type)
- {
- case 0://CONTROL
- case METHOD_BODY: {
- ClassId c = buffer.getOctet();
- MethodId m = buffer.getOctet();
- setMethod(c, m);
- break;
- }
- case HEADER_BODY: *this=in_place<AMQHeaderBody>(); break;
- case CONTENT_BODY: *this=in_place<AMQContentBody>(); break;
- case HEARTBEAT_BODY: *this=in_place<AMQHeartbeatBody>(); break;
- default:
- throw IllegalArgumentException(QPID_MSG("Invalid frame type " << type));
- }
- get()->decode(buffer, size);
-}
-
-uint32_t BodyHolder::encodedSize() const {
- const AMQMethodBody* method=getMethod();
- if (method)
- return sizeof(ClassId)+sizeof(MethodId)+method->encodedSize();
- else
- return get()->encodedSize();
-}
-
-}} // namespace qpid::framing
-
diff --git a/cpp/src/qpid/framing/BodyHolder.h b/cpp/src/qpid/framing/BodyHolder.h
index 0a35bb5e99..e69de29bb2 100644
--- a/cpp/src/qpid/framing/BodyHolder.h
+++ b/cpp/src/qpid/framing/BodyHolder.h
@@ -1,88 +0,0 @@
-#ifndef QPID_FRAMING_BODYHOLDER_H
-#define QPID_FRAMING_BODYHOLDER_H
-
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
-#include "qpid/framing/AMQBody.h"
-#include "qpid/framing/Blob.h"
-#include "qpid/framing/MaxMethodBodySize.h" // Generated file.
-#include "qpid/framing/amqp_types.h"
-#include "qpid/RefCounted.h"
-
-
-namespace qpid {
-namespace framing {
-
-class AMQMethodBody;
-class AMQBody;
-class Buffer;
-
-/**
- * Holder for arbitrary frame body.
- */
-class BodyHolder : public RefCounted
-{
- public:
- // default copy, assign dtor ok.
- BodyHolder() {}
- BodyHolder(const AMQBody& b) { setBody(b); }
- BodyHolder(ClassId c, MethodId m) { setMethod(c,m); }
-
- /** Construct from an in_place constructor expression. */
- template <class InPlace>
- BodyHolder(const InPlace& ip, typename EnableInPlace<InPlace>::type* =0)
- : blob(ip) {}
-
- void setBody(const AMQBody& b);
-
- /** Assign from an in_place constructor expression. */
- template <class InPlace>
- typename EnableInPlace<InPlace,BodyHolder&>::type
- operator=(const InPlace& ip) { blob=ip; return *this; }
-
- /** Assign by copying. */
- template <class T>
- typename DisableInPlace<T,BodyHolder&>::type operator=(const T& x)
- { blob=in_place<T>(x); return *this; }
-
- /** Set to method with ClassId c, MethodId m. */
- void setMethod(ClassId c, MethodId m);
-
- void encode(Buffer&) const;
- void decode(uint8_t frameType, Buffer&, uint32_t=0);
- uint32_t encodedSize() const;
-
- /** Return body pointer or 0 if empty. */
- AMQBody* get() { return blob.get(); }
- const AMQBody* get() const { return blob.get(); }
-
- /** Return method pointer or 0 if not a method. */
- AMQMethodBody* getMethod() { return get()->getMethod(); }
- const AMQMethodBody* getMethod() const { return get()->getMethod(); }
-
- private:
- Blob<MAX_METHOD_BODY_SIZE, AMQBody> blob;
-};
-
-}} // namespace qpid::framing
-
-#endif /*!QPID_FRAMING_BODYHOLDER_H*/
diff --git a/cpp/src/qpid/framing/FieldTable.cpp b/cpp/src/qpid/framing/FieldTable.cpp
index 7260261970..559aa8b013 100644
--- a/cpp/src/qpid/framing/FieldTable.cpp
+++ b/cpp/src/qpid/framing/FieldTable.cpp
@@ -201,6 +201,7 @@ void FieldTable::encode(Buffer& buffer) const{
}
void FieldTable::decode(Buffer& buffer){
+ clear();
uint32_t len = buffer.getLong();
if (len) {
uint32_t available = buffer.available();
diff --git a/cpp/src/qpid/framing/MethodBodyFactory.h b/cpp/src/qpid/framing/MethodBodyFactory.h
new file mode 100644
index 0000000000..da20407b6e
--- /dev/null
+++ b/cpp/src/qpid/framing/MethodBodyFactory.h
@@ -0,0 +1,44 @@
+#ifndef QPID_FRAMING_METHODBODYFACTORY_H
+#define QPID_FRAMING_METHODBODYFACTORY_H
+
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+#include "amqp_types.h"
+#include <boost/intrusive_ptr.hpp>
+
+namespace qpid {
+namespace framing {
+
+class AMQMethodBody;
+
+/**
+ * Functions to create instances of AMQMethodBody sub-classes.
+ * Note: MethodBodyFactory.cpp file is generated by rubygen.
+ */
+class MethodBodyFactory
+{
+ public:
+ static boost::intrusive_ptr<AMQMethodBody> create(ClassId c, MethodId m);
+};
+
+}} // namespace qpid::framing
+
+#endif /*!QPID_FRAMING_METHODBODYFACTORY_H*/
diff --git a/cpp/src/qpid/framing/SendContent.cpp b/cpp/src/qpid/framing/SendContent.cpp
index f390106dee..2b8287a5c2 100644
--- a/cpp/src/qpid/framing/SendContent.cpp
+++ b/cpp/src/qpid/framing/SendContent.cpp
@@ -53,8 +53,7 @@ void qpid::framing::SendContent::operator()(const AMQFrame& f)
void qpid::framing::SendContent::sendFragment(const AMQContentBody& body, uint32_t offset, uint16_t size, bool first, bool last) const
{
- AMQFrame fragment(in_place<AMQContentBody>(
- body.getData().substr(offset, size)));
+ AMQFrame fragment((AMQContentBody(body.getData().substr(offset, size))));
setFlags(fragment, first, last);
handler.handle(fragment);
}
diff --git a/cpp/src/qpid/framing/SequenceNumberSet.cpp b/cpp/src/qpid/framing/SequenceNumberSet.cpp
index afab9033e5..58db2614fb 100644
--- a/cpp/src/qpid/framing/SequenceNumberSet.cpp
+++ b/cpp/src/qpid/framing/SequenceNumberSet.cpp
@@ -33,6 +33,7 @@ void SequenceNumberSet::encode(Buffer& buffer) const
void SequenceNumberSet::decode(Buffer& buffer)
{
+ clear();
uint16_t count = (buffer.getShort() / 4);
for (uint16_t i = 0; i < count; i++) {
push_back(SequenceNumber(buffer.getLong()));
diff --git a/cpp/src/qpid/framing/SequenceSet.cpp b/cpp/src/qpid/framing/SequenceSet.cpp
index 2046fac3e1..1c913e68f8 100644
--- a/cpp/src/qpid/framing/SequenceSet.cpp
+++ b/cpp/src/qpid/framing/SequenceSet.cpp
@@ -46,6 +46,7 @@ void SequenceSet::encode(Buffer& buffer) const
void SequenceSet::decode(Buffer& buffer)
{
+ clear();
uint16_t size = buffer.getShort();
uint16_t count = size / RANGE_SIZE;//number of ranges
if (size % RANGE_SIZE)
diff --git a/cpp/src/qpid/management/ManagementBroker.cpp b/cpp/src/qpid/management/ManagementBroker.cpp
index cc7a2dc4f3..2175bc4676 100644
--- a/cpp/src/qpid/management/ManagementBroker.cpp
+++ b/cpp/src/qpid/management/ManagementBroker.cpp
@@ -301,10 +301,9 @@ void ManagementBroker::sendBuffer(Buffer& buf,
return;
intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(
- ProtocolVersion(), exchange->getName (), 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
- AMQFrame content(in_place<AMQContentBody>());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), exchange->getName (), 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
+ AMQFrame content((AMQContentBody()));
content.castBody<AMQContentBody>()->decode(buf, length);
diff --git a/cpp/src/qpid/replication/ReplicatingEventListener.cpp b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
index d50ef852ef..52634e5640 100644
--- a/cpp/src/qpid/replication/ReplicatingEventListener.cpp
+++ b/cpp/src/qpid/replication/ReplicatingEventListener.cpp
@@ -77,8 +77,8 @@ void ReplicatingEventListener::deliverEnqueueMessage(const QueuedMessage& enqueu
boost::intrusive_ptr<Message> ReplicatingEventListener::createMessage(const FieldTable& headers)
{
boost::intrusive_ptr<Message> msg(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
- AMQFrame header(in_place<AMQHeaderBody>());
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0)));
+ AMQFrame header((AMQHeaderBody()));
header.setBof(false);
header.setEof(true);
header.setBos(true);
@@ -105,7 +105,7 @@ struct AppendingHandler : FrameHandler
boost::intrusive_ptr<Message> ReplicatingEventListener::cloneMessage(Queue& queue, boost::intrusive_ptr<Message> original)
{
boost::intrusive_ptr<Message> copy(new Message());
- AMQFrame method(in_place<MessageTransferBody>(ProtocolVersion(), EMPTY, 0, 0));
+ AMQFrame method((MessageTransferBody(ProtocolVersion(), EMPTY, 0, 0)));
AppendingHandler handler(copy);
handler.handle(method);
original->sendHeader(handler, std::numeric_limits<int16_t>::max());
diff --git a/cpp/src/qpid/sys/Thread.h b/cpp/src/qpid/sys/Thread.h
index 2fd59621d8..e2b904aa1a 100644
--- a/cpp/src/qpid/sys/Thread.h
+++ b/cpp/src/qpid/sys/Thread.h
@@ -25,10 +25,10 @@
#ifdef _WIN32
# define QPID_TSS __declspec(thread)
-#elif defined (gcc)
+#elif defined (__GNUC__)
# define QPID_TSS __thread
#else
-# define QPID_TSS
+# error "Dont know how to define QPID_TSS for this platform"
#endif
namespace qpid {