summaryrefslogtreecommitdiff
path: root/cpp
diff options
context:
space:
mode:
Diffstat (limited to 'cpp')
-rw-r--r--cpp/src/qpid/broker/Consumer.h6
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.cpp2
-rw-r--r--cpp/src/qpid/broker/DeliverableMessage.h4
-rw-r--r--cpp/src/qpid/broker/IncomingExecutionContext.cpp2
-rw-r--r--cpp/src/qpid/broker/IncomingExecutionContext.h4
-rw-r--r--cpp/src/qpid/broker/Message.h2
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.cpp4
-rw-r--r--cpp/src/qpid/broker/MessageBuilder.h6
-rw-r--r--cpp/src/qpid/broker/MessageDelivery.cpp8
-rw-r--r--cpp/src/qpid/broker/Persistable.h3
-rw-r--r--cpp/src/qpid/broker/Queue.cpp14
-rw-r--r--cpp/src/qpid/broker/Queue.h14
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoveredDequeue.h4
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.cpp2
-rw-r--r--cpp/src/qpid/broker/RecoveredEnqueue.h4
-rw-r--r--cpp/src/qpid/broker/RecoveryManagerImpl.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticHandler.cpp2
-rw-r--r--cpp/src/qpid/broker/SemanticState.cpp10
-rw-r--r--cpp/src/qpid/broker/SemanticState.h10
-rw-r--r--cpp/src/qpid/broker/TxPublish.cpp6
-rw-r--r--cpp/src/qpid/broker/TxPublish.h12
-rw-r--r--cpp/src/qpid/cluster/SessionManager.cpp4
-rw-r--r--cpp/src/qpid/management/ManagementAgent.cpp4
-rw-r--r--cpp/src/tests/BrokerChannelTest.cpp28
-rw-r--r--cpp/src/tests/ExchangeTest.cpp2
-rw-r--r--cpp/src/tests/MessageTest.cpp4
-rw-r--r--cpp/src/tests/MessageUtils.h11
-rw-r--r--cpp/src/tests/QueueTest.cpp26
-rw-r--r--cpp/src/tests/TxAckTest.cpp5
-rw-r--r--cpp/src/tests/TxPublishTest.cpp4
31 files changed, 107 insertions, 104 deletions
diff --git a/cpp/src/qpid/broker/Consumer.h b/cpp/src/qpid/broker/Consumer.h
index fb60fc88a8..5e09a00113 100644
--- a/cpp/src/qpid/broker/Consumer.h
+++ b/cpp/src/qpid/broker/Consumer.h
@@ -33,11 +33,11 @@ namespace qpid {
struct QueuedMessage
{
- Message::shared_ptr payload;
+ intrusive_ptr<Message> payload;
framing::SequenceNumber position;
Queue* queue;
- QueuedMessage(Queue* q, Message::shared_ptr msg, framing::SequenceNumber sn) :
+ QueuedMessage(Queue* q, intrusive_ptr<Message> msg, framing::SequenceNumber sn) :
payload(msg), position(sn), queue(q) {}
QueuedMessage(Queue* q) : queue(q) {}
};
@@ -53,7 +53,7 @@ namespace qpid {
Consumer(bool preAcquires = true) : acquires(preAcquires) {}
bool preAcquires() const { return acquires; }
virtual bool deliver(QueuedMessage& msg) = 0;
- virtual bool filter(Message::shared_ptr) { return true; }
+ virtual bool filter(intrusive_ptr<Message>) { return true; }
virtual ~Consumer(){}
};
}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.cpp b/cpp/src/qpid/broker/DeliverableMessage.cpp
index 9a3752d71c..e3fc39ce14 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.cpp
+++ b/cpp/src/qpid/broker/DeliverableMessage.cpp
@@ -22,7 +22,7 @@
using namespace qpid::broker;
-DeliverableMessage::DeliverableMessage(Message::shared_ptr& _msg) : msg(_msg)
+DeliverableMessage::DeliverableMessage(intrusive_ptr<Message>& _msg) : msg(_msg)
{
}
diff --git a/cpp/src/qpid/broker/DeliverableMessage.h b/cpp/src/qpid/broker/DeliverableMessage.h
index 07bca40461..39e5c04110 100644
--- a/cpp/src/qpid/broker/DeliverableMessage.h
+++ b/cpp/src/qpid/broker/DeliverableMessage.h
@@ -28,9 +28,9 @@
namespace qpid {
namespace broker {
class DeliverableMessage : public Deliverable{
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
public:
- DeliverableMessage(Message::shared_ptr& msg);
+ DeliverableMessage(intrusive_ptr<Message>& msg);
virtual void deliverTo(Queue::shared_ptr& queue);
Message& getMessage();
virtual ~DeliverableMessage(){}
diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.cpp b/cpp/src/qpid/broker/IncomingExecutionContext.cpp
index 4747c71033..f2c2965ecd 100644
--- a/cpp/src/qpid/broker/IncomingExecutionContext.cpp
+++ b/cpp/src/qpid/broker/IncomingExecutionContext.cpp
@@ -77,7 +77,7 @@ void IncomingExecutionContext::complete(const SequenceNumber& command)
completed.update(command, command);
}
-void IncomingExecutionContext::track(Message::shared_ptr msg)
+void IncomingExecutionContext::track(intrusive_ptr<Message> msg)
{
if (msg->isEnqueueComplete()) {
complete(msg->getCommandId());
diff --git a/cpp/src/qpid/broker/IncomingExecutionContext.h b/cpp/src/qpid/broker/IncomingExecutionContext.h
index 1e4a394be6..3056c2b4bc 100644
--- a/cpp/src/qpid/broker/IncomingExecutionContext.h
+++ b/cpp/src/qpid/broker/IncomingExecutionContext.h
@@ -30,7 +30,7 @@ namespace broker {
class IncomingExecutionContext
{
- typedef std::list<Message::shared_ptr> Messages;
+ typedef std::list<intrusive_ptr<Message> > Messages;
framing::Window window;
framing::AccumulatedAck completed;
Messages incomplete;
@@ -45,7 +45,7 @@ public:
void sync(const framing::SequenceNumber& point);
framing::SequenceNumber next();
void complete(const framing::SequenceNumber& command);
- void track(Message::shared_ptr);
+ void track(intrusive_ptr<Message>);
const framing::SequenceNumber& getMark();
framing::SequenceNumberSet getRange();
diff --git a/cpp/src/qpid/broker/Message.h b/cpp/src/qpid/broker/Message.h
index 7fe8628bf5..1581e4dd3c 100644
--- a/cpp/src/qpid/broker/Message.h
+++ b/cpp/src/qpid/broker/Message.h
@@ -45,7 +45,7 @@ class Queue;
class Message : public PersistableMessage {
public:
- typedef boost::shared_ptr<Message> shared_ptr;
+ typedef boost::intrusive_ptr<Message> shared_ptr;
Message(const framing::SequenceNumber& id = framing::SequenceNumber());
diff --git a/cpp/src/qpid/broker/MessageBuilder.cpp b/cpp/src/qpid/broker/MessageBuilder.cpp
index 42448babb5..a56c65333c 100644
--- a/cpp/src/qpid/broker/MessageBuilder.cpp
+++ b/cpp/src/qpid/broker/MessageBuilder.cpp
@@ -62,14 +62,14 @@ void MessageBuilder::handle(AMQFrame& frame)
void MessageBuilder::end()
{
- message.reset();
+ message = 0;
state = DORMANT;
staging = false;
}
void MessageBuilder::start(const SequenceNumber& id)
{
- message = Message::shared_ptr(new Message(id));
+ message = intrusive_ptr<Message>(new Message(id));
state = METHOD;
staging = false;
}
diff --git a/cpp/src/qpid/broker/MessageBuilder.h b/cpp/src/qpid/broker/MessageBuilder.h
index 134f93b68f..c7ed2abc04 100644
--- a/cpp/src/qpid/broker/MessageBuilder.h
+++ b/cpp/src/qpid/broker/MessageBuilder.h
@@ -21,9 +21,9 @@
#ifndef _MessageBuilder_
#define _MessageBuilder_
-#include "boost/shared_ptr.hpp"
#include "qpid/framing/FrameHandler.h"
#include "qpid/framing/SequenceNumber.h"
+#include "qpid/RefCounted.h"
namespace qpid {
namespace broker {
@@ -34,13 +34,13 @@ namespace qpid {
public:
MessageBuilder(MessageStore* const store = 0, uint64_t stagingThreshold = 0);
void handle(framing::AMQFrame& frame);
- boost::shared_ptr<Message> getMessage() { return message; }
+ intrusive_ptr<Message> getMessage() { return message; }
void start(const framing::SequenceNumber& id);
void end();
private:
enum State {DORMANT, METHOD, HEADER, CONTENT};
State state;
- boost::shared_ptr<Message> message;
+ intrusive_ptr<Message> message;
MessageStore* const store;
const uint64_t stagingThreshold;
bool staging;
diff --git a/cpp/src/qpid/broker/MessageDelivery.cpp b/cpp/src/qpid/broker/MessageDelivery.cpp
index b29850f9e1..d7f8bceae1 100644
--- a/cpp/src/qpid/broker/MessageDelivery.cpp
+++ b/cpp/src/qpid/broker/MessageDelivery.cpp
@@ -39,7 +39,7 @@ namespace broker{
struct BaseToken : DeliveryToken
{
virtual ~BaseToken() {}
- virtual AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id) = 0;
+ virtual AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id) = 0;
};
struct BasicGetToken : BaseToken
@@ -50,7 +50,7 @@ struct BasicGetToken : BaseToken
BasicGetToken(Queue::shared_ptr q) : queue(q) {}
- AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
+ AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
return AMQFrame(0, BasicGetOkBody(
ProtocolVersion(), id.getValue(), msg->getRedelivered(), msg->getExchangeName(),
@@ -66,7 +66,7 @@ struct BasicConsumeToken : BaseToken
BasicConsumeToken(const string c) : consumer(c) {}
- AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId id)
+ AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId id)
{
return AMQFrame(0, BasicDeliverBody(
ProtocolVersion(), consumer, id.getValue(),
@@ -84,7 +84,7 @@ struct MessageDeliveryToken : BaseToken
MessageDeliveryToken(const std::string& d, u_int8_t c, u_int8_t a) :
destination(d), confirmMode(c), acquireMode(a) {}
- AMQFrame sendMethod(Message::shared_ptr msg, DeliveryId /*id*/)
+ AMQFrame sendMethod(intrusive_ptr<Message> msg, DeliveryId /*id*/)
{
//may need to set the redelivered flag:
if (msg->getRedelivered()){
diff --git a/cpp/src/qpid/broker/Persistable.h b/cpp/src/qpid/broker/Persistable.h
index 3fb96d8568..36499c7a1a 100644
--- a/cpp/src/qpid/broker/Persistable.h
+++ b/cpp/src/qpid/broker/Persistable.h
@@ -24,6 +24,7 @@
#include "qpid/framing/amqp_types.h"
#include "qpid/framing/Buffer.h"
+#include "qpid/RefCounted.h"
namespace qpid {
namespace broker {
@@ -31,7 +32,7 @@ namespace broker {
/**
* Base class for all persistable objects
*/
-class Persistable
+class Persistable : public RefCounted
{
public:
/**
diff --git a/cpp/src/qpid/broker/Queue.cpp b/cpp/src/qpid/broker/Queue.cpp
index e6d79056cd..757f0aa62d 100644
--- a/cpp/src/qpid/broker/Queue.cpp
+++ b/cpp/src/qpid/broker/Queue.cpp
@@ -81,7 +81,7 @@ void Queue::notifyDurableIOComplete()
}
-void Queue::deliver(Message::shared_ptr& msg){
+void Queue::deliver(intrusive_ptr<Message>& msg){
if (msg->isImmediate() && getConsumerCount() == 0) {
if (alternateExchange) {
DeliverableMessage deliverable(msg);
@@ -108,7 +108,7 @@ void Queue::deliver(Message::shared_ptr& msg){
}
-void Queue::recover(Message::shared_ptr& msg){
+void Queue::recover(intrusive_ptr<Message>& msg){
push(msg);
msg->enqueueComplete(); // mark the message as enqueued
if (mgmtObject != 0)
@@ -120,7 +120,7 @@ void Queue::recover(Message::shared_ptr& msg){
}
}
-void Queue::process(Message::shared_ptr& msg){
+void Queue::process(intrusive_ptr<Message>& msg){
uint32_t mask = management::MSG_MASK_TX;
@@ -178,7 +178,7 @@ void Queue::flush(DispatchCompletion& completion)
* the message, or if the queue is exclusive to a single connection
* and has a single consumer (covers the JMS topic case).
*/
-bool Queue::exclude(Message::shared_ptr msg)
+bool Queue::exclude(intrusive_ptr<Message> msg)
{
RWlock::ScopedWlock locker(consumerLock);
if (exclusive) {
@@ -373,7 +373,7 @@ void Queue::pop(){
messages.pop_front();
}
-void Queue::push(Message::shared_ptr& msg){
+void Queue::push(intrusive_ptr<Message>& msg){
Mutex::ScopedLock locker(messageLock);
messages.push_back(QueuedMessage(this, msg, ++sequence));
if (policy.get()) {
@@ -412,7 +412,7 @@ bool Queue::canAutoDelete() const{
}
// return true if store exists,
-bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
+bool Queue::enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->enqueueAsync(this, store); //increment to async counter -- for message sent to more than one queue
@@ -423,7 +423,7 @@ bool Queue::enqueue(TransactionContext* ctxt, Message::shared_ptr msg)
}
// return true if store exists,
-bool Queue::dequeue(TransactionContext* ctxt, Message::shared_ptr msg)
+bool Queue::dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg)
{
if (msg->isPersistent() && store) {
msg->dequeueAsync(this, store); //increment to async counter -- for message sent to more than one queue
diff --git a/cpp/src/qpid/broker/Queue.h b/cpp/src/qpid/broker/Queue.h
index 9eca31e4fc..9a7b893f36 100644
--- a/cpp/src/qpid/broker/Queue.h
+++ b/cpp/src/qpid/broker/Queue.h
@@ -99,7 +99,7 @@ namespace qpid {
management::Queue::shared_ptr mgmtObject;
void pop();
- void push(Message::shared_ptr& msg);
+ void push(intrusive_ptr<Message>& msg);
bool dispatch(QueuedMessage& msg);
void setPolicy(std::auto_ptr<QueuePolicy> policy);
/**
@@ -113,7 +113,7 @@ namespace qpid {
bool seek(QueuedMessage& msg, const framing::SequenceNumber& position);
uint32_t getAcquirerCount() const;
bool getNextMessage(QueuedMessage& msg);
- bool exclude(Message::shared_ptr msg);
+ bool exclude(intrusive_ptr<Message> msg);
public:
@@ -140,12 +140,12 @@ namespace qpid {
* Delivers a message to the queue. Will record it as
* enqueued if persistent then process it.
*/
- void deliver(Message::shared_ptr& msg);
+ void deliver(intrusive_ptr<Message>& msg);
/**
* Dispatches the messages immediately to a consumer if
* one is available or stores it for later if not.
*/
- void process(Message::shared_ptr& msg);
+ void process(intrusive_ptr<Message>& msg);
/**
* Returns a message to the in-memory queue (due to lack
* of acknowledegement from a receiver). If a consumer is
@@ -156,7 +156,7 @@ namespace qpid {
/**
* Used during recovery to add stored messages back to the queue
*/
- void recover(Message::shared_ptr& msg);
+ void recover(intrusive_ptr<Message>& msg);
/**
* Request dispatch any queued messages providing there are
* consumers for them. Only one thread can be dispatching
@@ -181,11 +181,11 @@ namespace qpid {
inline bool isAutoDelete() const { return autodelete; }
bool canAutoDelete() const;
- bool enqueue(TransactionContext* ctxt, Message::shared_ptr msg);
+ bool enqueue(TransactionContext* ctxt, intrusive_ptr<Message> msg);
/**
* dequeue from store (only done once messages is acknowledged)
*/
- bool dequeue(TransactionContext* ctxt, Message::shared_ptr msg);
+ bool dequeue(TransactionContext* ctxt, intrusive_ptr<Message> msg);
/**
* dequeues from memory only
*/
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.cpp b/cpp/src/qpid/broker/RecoveredDequeue.cpp
index b59a7f98c8..9b5e23884e 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.cpp
+++ b/cpp/src/qpid/broker/RecoveredDequeue.cpp
@@ -22,7 +22,7 @@
using namespace qpid::broker;
-RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+RecoveredDequeue::RecoveredDequeue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {}
bool RecoveredDequeue::prepare(TransactionContext*) throw(){
//should never be called; transaction has already prepared if an enqueue is recovered
diff --git a/cpp/src/qpid/broker/RecoveredDequeue.h b/cpp/src/qpid/broker/RecoveredDequeue.h
index a6d64e6faf..82668adb67 100644
--- a/cpp/src/qpid/broker/RecoveredDequeue.h
+++ b/cpp/src/qpid/broker/RecoveredDequeue.h
@@ -34,10 +34,10 @@ namespace qpid {
namespace broker {
class RecoveredDequeue : public TxOp{
Queue::shared_ptr queue;
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
public:
- RecoveredDequeue(Queue::shared_ptr queue, Message::shared_ptr msg);
+ RecoveredDequeue(Queue::shared_ptr queue, intrusive_ptr<Message> msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.cpp b/cpp/src/qpid/broker/RecoveredEnqueue.cpp
index e5019affd8..5eeab7a435 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.cpp
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.cpp
@@ -22,7 +22,7 @@
using namespace qpid::broker;
-RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, Message::shared_ptr _msg) : queue(_queue), msg(_msg) {}
+RecoveredEnqueue::RecoveredEnqueue(Queue::shared_ptr _queue, intrusive_ptr<Message> _msg) : queue(_queue), msg(_msg) {}
bool RecoveredEnqueue::prepare(TransactionContext*) throw(){
//should never be called; transaction has already prepared if an enqueue is recovered
diff --git a/cpp/src/qpid/broker/RecoveredEnqueue.h b/cpp/src/qpid/broker/RecoveredEnqueue.h
index 9ada62e62b..25b55e3e0c 100644
--- a/cpp/src/qpid/broker/RecoveredEnqueue.h
+++ b/cpp/src/qpid/broker/RecoveredEnqueue.h
@@ -34,10 +34,10 @@ namespace qpid {
namespace broker {
class RecoveredEnqueue : public TxOp{
Queue::shared_ptr queue;
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
public:
- RecoveredEnqueue(Queue::shared_ptr queue, Message::shared_ptr msg);
+ RecoveredEnqueue(Queue::shared_ptr queue, intrusive_ptr<Message> msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
index 1e3168137e..65583f1964 100644
--- a/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
+++ b/cpp/src/qpid/broker/RecoveryManagerImpl.cpp
@@ -41,7 +41,7 @@ RecoveryManagerImpl::~RecoveryManagerImpl() {}
class RecoverableMessageImpl : public RecoverableMessage
{
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
const uint64_t stagingThreshold;
public:
RecoverableMessageImpl(Message::shared_ptr& _msg, uint64_t _stagingThreshold)
diff --git a/cpp/src/qpid/broker/SemanticHandler.cpp b/cpp/src/qpid/broker/SemanticHandler.cpp
index d6e13e0a55..ba43b5ecba 100644
--- a/cpp/src/qpid/broker/SemanticHandler.cpp
+++ b/cpp/src/qpid/broker/SemanticHandler.cpp
@@ -146,7 +146,7 @@ void SemanticHandler::handleL3(framing::AMQMethodBody* method)
void SemanticHandler::handleContent(AMQFrame& frame)
{
- Message::shared_ptr msg(msgBuilder.getMessage());
+ intrusive_ptr<Message> msg(msgBuilder.getMessage());
if (!msg) {//start of frameset will be indicated by frame flags
msgBuilder.start(incoming.next());
msg = msgBuilder.getMessage();
diff --git a/cpp/src/qpid/broker/SemanticState.cpp b/cpp/src/qpid/broker/SemanticState.cpp
index 2b57c66645..270cdca731 100644
--- a/cpp/src/qpid/broker/SemanticState.cpp
+++ b/cpp/src/qpid/broker/SemanticState.cpp
@@ -230,7 +230,7 @@ void SemanticState::record(const DeliveryRecord& delivery)
delivery.addTo(outstanding);
}
-bool SemanticState::checkPrefetch(Message::shared_ptr& msg)
+bool SemanticState::checkPrefetch(intrusive_ptr<Message>& msg)
{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
@@ -288,13 +288,13 @@ bool SemanticState::ConsumerImpl::deliver(QueuedMessage& msg)
}
}
-bool SemanticState::ConsumerImpl::filter(Message::shared_ptr msg)
+bool SemanticState::ConsumerImpl::filter(intrusive_ptr<Message> msg)
{
return !(nolocal &&
&parent->getSession().getConnection() == msg->getPublisher());
}
-bool SemanticState::ConsumerImpl::checkCredit(Message::shared_ptr& msg)
+bool SemanticState::ConsumerImpl::checkCredit(intrusive_ptr<Message>& msg)
{
Mutex::ScopedLock l(lock);
if (msgCredit == 0 || (byteCredit != 0xFFFFFFFF && byteCredit < msg->getRequiredCredit())) {
@@ -331,7 +331,7 @@ void SemanticState::cancel(ConsumerImpl::shared_ptr c)
}
}
-void SemanticState::handle(Message::shared_ptr msg) {
+void SemanticState::handle(intrusive_ptr<Message> msg) {
if (txBuffer.get()) {
TxPublish* deliverable(new TxPublish(msg));
TxOp::shared_ptr op(deliverable);
@@ -343,7 +343,7 @@ void SemanticState::handle(Message::shared_ptr msg) {
}
}
-void SemanticState::route(Message::shared_ptr msg, Deliverable& strategy) {
+void SemanticState::route(intrusive_ptr<Message> msg, Deliverable& strategy) {
std::string exchangeName = msg->getExchangeName();
if (!cacheExchange || cacheExchange->getName() != exchangeName){
cacheExchange = session.getConnection().broker.getExchanges().get(exchangeName);
diff --git a/cpp/src/qpid/broker/SemanticState.h b/cpp/src/qpid/broker/SemanticState.h
index d0e3eed8e1..ea58a74f45 100644
--- a/cpp/src/qpid/broker/SemanticState.h
+++ b/cpp/src/qpid/broker/SemanticState.h
@@ -68,7 +68,7 @@ class SemanticState : public framing::FrameHandler::Chains,
uint32_t msgCredit;
uint32_t byteCredit;
- bool checkCredit(Message::shared_ptr& msg);
+ bool checkCredit(intrusive_ptr<Message>& msg);
public:
typedef shared_ptr<ConsumerImpl> shared_ptr;
@@ -78,7 +78,7 @@ class SemanticState : public framing::FrameHandler::Chains,
bool ack, bool nolocal, bool acquire);
~ConsumerImpl();
bool deliver(QueuedMessage& msg);
- bool filter(Message::shared_ptr msg);
+ bool filter(intrusive_ptr<Message> msg);
void setWindowMode();
void setCreditMode();
@@ -124,9 +124,9 @@ class SemanticState : public framing::FrameHandler::Chains,
boost::shared_ptr<Exchange> cacheExchange;
- void route(Message::shared_ptr msg, Deliverable& strategy);
+ void route(intrusive_ptr<Message> msg, Deliverable& strategy);
void record(const DeliveryRecord& delivery);
- bool checkPrefetch(Message::shared_ptr& msg);
+ bool checkPrefetch(intrusive_ptr<Message>& msg);
void checkDtxTimeout();
ConsumerImpl::shared_ptr find(const std::string& destination);
void ack(DeliveryId deliveryTag, DeliveryId endTag, bool cumulative);
@@ -187,7 +187,7 @@ class SemanticState : public framing::FrameHandler::Chains,
void acquire(DeliveryId first, DeliveryId last, std::vector<DeliveryId>& acquired);
void release(DeliveryId first, DeliveryId last);
void reject(DeliveryId first, DeliveryId last);
- void handle(Message::shared_ptr msg);
+ void handle(intrusive_ptr<Message> msg);
};
}} // namespace qpid::broker
diff --git a/cpp/src/qpid/broker/TxPublish.cpp b/cpp/src/qpid/broker/TxPublish.cpp
index ac246a3bfe..07e72c49f4 100644
--- a/cpp/src/qpid/broker/TxPublish.cpp
+++ b/cpp/src/qpid/broker/TxPublish.cpp
@@ -23,7 +23,7 @@
using namespace qpid::broker;
-TxPublish::TxPublish(Message::shared_ptr _msg) : msg(_msg) {}
+TxPublish::TxPublish(intrusive_ptr<Message> _msg) : msg(_msg) {}
bool TxPublish::prepare(TransactionContext* ctxt) throw(){
try{
@@ -47,7 +47,7 @@ void TxPublish::deliverTo(Queue::shared_ptr& queue){
delivered = true;
}
-TxPublish::Prepare::Prepare(TransactionContext* _ctxt, Message::shared_ptr& _msg)
+TxPublish::Prepare::Prepare(TransactionContext* _ctxt, intrusive_ptr<Message>& _msg)
: ctxt(_ctxt), msg(_msg){}
void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
@@ -61,7 +61,7 @@ void TxPublish::Prepare::operator()(Queue::shared_ptr& queue){
}
}
-TxPublish::Commit::Commit(Message::shared_ptr& _msg) : msg(_msg){}
+TxPublish::Commit::Commit(intrusive_ptr<Message>& _msg) : msg(_msg){}
void TxPublish::Commit::operator()(Queue::shared_ptr& queue){
queue->process(msg);
diff --git a/cpp/src/qpid/broker/TxPublish.h b/cpp/src/qpid/broker/TxPublish.h
index 2e3268010a..b4323864bc 100644
--- a/cpp/src/qpid/broker/TxPublish.h
+++ b/cpp/src/qpid/broker/TxPublish.h
@@ -45,24 +45,24 @@ namespace qpid {
class TxPublish : public TxOp, public Deliverable{
class Prepare{
TransactionContext* ctxt;
- Message::shared_ptr& msg;
+ intrusive_ptr<Message>& msg;
public:
- Prepare(TransactionContext* ctxt, Message::shared_ptr& msg);
+ Prepare(TransactionContext* ctxt, intrusive_ptr<Message>& msg);
void operator()(Queue::shared_ptr& queue);
};
class Commit{
- Message::shared_ptr& msg;
+ intrusive_ptr<Message>& msg;
public:
- Commit(Message::shared_ptr& msg);
+ Commit(intrusive_ptr<Message>& msg);
void operator()(Queue::shared_ptr& queue);
};
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
std::list<Queue::shared_ptr> queues;
public:
- TxPublish(Message::shared_ptr msg);
+ TxPublish(intrusive_ptr<Message> msg);
virtual bool prepare(TransactionContext* ctxt) throw();
virtual void commit() throw();
virtual void rollback() throw();
diff --git a/cpp/src/qpid/cluster/SessionManager.cpp b/cpp/src/qpid/cluster/SessionManager.cpp
index 537b17a23c..71f52507b8 100644
--- a/cpp/src/qpid/cluster/SessionManager.cpp
+++ b/cpp/src/qpid/cluster/SessionManager.cpp
@@ -74,8 +74,8 @@ struct SessionManager::BrokerHandler : public FrameHandler, private ChannelAdapt
virtual void send(const AMQBody&) {}
//delivery adapter methods, also no-ops:
- virtual DeliveryId deliver(Message::shared_ptr&, DeliveryToken::shared_ptr) { return 0; }
- virtual void redeliver(Message::shared_ptr&, DeliveryToken::shared_ptr, DeliveryId) {}
+ virtual DeliveryId deliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr) { return 0; }
+ virtual void redeliver(intrusive_ptr<Message>&, DeliveryToken::shared_ptr, DeliveryId) {}
};
SessionManager::~SessionManager(){}
diff --git a/cpp/src/qpid/management/ManagementAgent.cpp b/cpp/src/qpid/management/ManagementAgent.cpp
index b5b4da09b8..2f7f25b9d6 100644
--- a/cpp/src/qpid/management/ManagementAgent.cpp
+++ b/cpp/src/qpid/management/ManagementAgent.cpp
@@ -96,7 +96,7 @@ void ManagementAgent::PeriodicProcessing (void)
if (managementObjects.empty ())
return;
- Message::shared_ptr msg (new Message ());
+ intrusive_ptr<Message> msg (new Message ());
// Build the magic number for the management message.
msgBuffer.putOctet ('A');
@@ -294,7 +294,7 @@ void ManagementAgent::dispatchCommand (Deliverable& deliverable,
iter->second->doMethod (methodName, inBuffer, outBuffer);
}
- Message::shared_ptr outMsg (new Message ());
+ intrusive_ptr<Message> outMsg (new Message ());
uint32_t msgSize = 4096 - outBuffer.available ();
outBuffer.reset ();
AMQFrame method (0, MessageTransferBody(ProtocolVersion(),
diff --git a/cpp/src/tests/BrokerChannelTest.cpp b/cpp/src/tests/BrokerChannelTest.cpp
index f0ccf73189..8752b8afeb 100644
--- a/cpp/src/tests/BrokerChannelTest.cpp
+++ b/cpp/src/tests/BrokerChannelTest.cpp
@@ -59,16 +59,16 @@ struct MockHandler : ConnectionOutputHandler{
struct DeliveryRecorder : DeliveryAdapter
{
DeliveryId id;
- typedef std::pair<Message::shared_ptr, DeliveryToken::shared_ptr> Delivery;
+ typedef std::pair<intrusive_ptr<Message>, DeliveryToken::shared_ptr> Delivery;
std::vector<Delivery> delivered;
- DeliveryId deliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token)
+ DeliveryId deliver(intrusive_ptr<Message>& msg, DeliveryToken::shared_ptr token)
{
delivered.push_back(Delivery(msg, token));
return ++id;
}
- void redeliver(Message::shared_ptr& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
+ void redeliver(intrusive_ptr<Message>& msg, DeliveryToken::shared_ptr token, DeliveryId /*tag*/)
{
delivered.push_back(Delivery(msg, token));
}
@@ -215,7 +215,7 @@ class BrokerChannelTest : public CppUnit::TestCase
void testDeliveryNoAck(){
Channel channel(connection, recorder, 7);
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ intrusive_ptr<Message> msg(createMessage("test", "my_routing_key", "my_message_id", 14));
Queue::shared_ptr queue(new Queue("my_queue"));
string tag("test");
DeliveryToken::shared_ptr token(MessageDelivery::getBasicConsumeToken("my-token"));
@@ -239,9 +239,9 @@ class BrokerChannelTest : public CppUnit::TestCase
const string data1("abcd");
const string data2("efghijk");
const string data3("lmnopqrstuvwxyz");
- Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size()));
- Message::shared_ptr msg2(createMessage("e", "B", "MsgB", data2.size()));
- Message::shared_ptr msg3(createMessage("e", "C", "MsgC", data3.size()));
+ intrusive_ptr<Message> msg1(createMessage("e", "A", "MsgA", data1.size()));
+ intrusive_ptr<Message> msg2(createMessage("e", "B", "MsgB", data2.size()));
+ intrusive_ptr<Message> msg3(createMessage("e", "C", "MsgC", data3.size()));
addContent(msg1, data1);
addContent(msg2, data2);
addContent(msg3, data3);
@@ -261,7 +261,7 @@ class BrokerChannelTest : public CppUnit::TestCase
queue->deliver(msg3);
sleep(2);
- Message::shared_ptr next = queue->dequeue().payload;
+ intrusive_ptr<Message> next = queue->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
CPPUNIT_ASSERT_EQUAL((uint32_t) data1.size(), next->encodedContentSize());
next = queue->dequeue().payload;
@@ -289,7 +289,7 @@ class BrokerChannelTest : public CppUnit::TestCase
MockMessageStore store;
{//must ensure that store is last thing deleted
const string data1("abcd");
- Message::shared_ptr msg1(createMessage("e", "A", "MsgA", data1.size()));
+ intrusive_ptr<Message> msg1(createMessage("e", "A", "MsgA", data1.size()));
addContent(msg1, data1);
Queue::shared_ptr queue1(new Queue("my_queue1", false, &store, 0));
@@ -300,7 +300,7 @@ class BrokerChannelTest : public CppUnit::TestCase
queue3->deliver(msg1);
sleep(2);
- Message::shared_ptr next = queue1->dequeue().payload;
+ intrusive_ptr<Message> next = queue1->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
next = queue2->dequeue().payload;
CPPUNIT_ASSERT_EQUAL(msg1, next);
@@ -327,7 +327,7 @@ class BrokerChannelTest : public CppUnit::TestCase
channel.flow(false);
//'publish' a message
- Message::shared_ptr msg(createMessage("test", "my_routing_key", "my_message_id", 14));
+ intrusive_ptr<Message> msg(createMessage("test", "my_routing_key", "my_message_id", 14));
addContent(msg, "abcdefghijklmn");
queue->deliver(msg);
@@ -342,9 +342,9 @@ class BrokerChannelTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(token, recorder.delivered.front().second);
}
- Message::shared_ptr createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
+ intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey, const string& messageId, uint64_t contentSize)
{
- Message::shared_ptr msg(new Message());
+ intrusive_ptr<Message> msg(new Message());
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
AMQFrame header(0, AMQHeaderBody());
@@ -358,7 +358,7 @@ class BrokerChannelTest : public CppUnit::TestCase
return msg;
}
- void addContent(Message::shared_ptr msg, const string& data)
+ void addContent(intrusive_ptr<Message> msg, const string& data)
{
AMQFrame content(0, AMQContentBody(data));
msg->getFrames().append(content);
diff --git a/cpp/src/tests/ExchangeTest.cpp b/cpp/src/tests/ExchangeTest.cpp
index 35fc5e0bdb..d1eca203c6 100644
--- a/cpp/src/tests/ExchangeTest.cpp
+++ b/cpp/src/tests/ExchangeTest.cpp
@@ -64,7 +64,7 @@ class ExchangeTest : public CppUnit::TestCase
queue.reset();
queue2.reset();
- Message::shared_ptr msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
+ intrusive_ptr<Message> msgPtr(MessageUtils::createMessage("exchange", "key", "id"));
DeliverableMessage msg(msgPtr);
topic.route(msg, "abc", 0);
direct.route(msg, "abc", 0);
diff --git a/cpp/src/tests/MessageTest.cpp b/cpp/src/tests/MessageTest.cpp
index 3d3de5e9d0..f5239165b0 100644
--- a/cpp/src/tests/MessageTest.cpp
+++ b/cpp/src/tests/MessageTest.cpp
@@ -49,7 +49,7 @@ class MessageTest : public CppUnit::TestCase
string data1("abcdefg");
string data2("hijklmn");
- Message::shared_ptr msg(new Message());
+ intrusive_ptr<Message> msg(new Message());
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
AMQFrame header(0, AMQHeaderBody());
@@ -77,7 +77,7 @@ class MessageTest : public CppUnit::TestCase
msg->encode(wbuffer);
Buffer rbuffer(buff, msg->encodedSize());
- msg.reset(new Message());
+ msg = new Message();
msg->decodeHeader(rbuffer);
msg->decodeContent(rbuffer);
CPPUNIT_ASSERT_EQUAL(exchange, msg->getExchangeName());
diff --git a/cpp/src/tests/MessageUtils.h b/cpp/src/tests/MessageUtils.h
index 7fb1755c4b..cf0f130fa6 100644
--- a/cpp/src/tests/MessageUtils.h
+++ b/cpp/src/tests/MessageUtils.h
@@ -23,15 +23,16 @@
#include "qpid/broker/MessageDelivery.h"
#include "qpid/framing/AMQFrame.h"
-using namespace qpid::broker;
-using namespace qpid::framing;
+using namespace qpid;
+using namespace broker;
+using namespace framing;
struct MessageUtils
{
- static Message::shared_ptr createMessage(const string& exchange, const string& routingKey,
+ static intrusive_ptr<Message> createMessage(const string& exchange, const string& routingKey,
const string& messageId, uint64_t contentSize = 0)
{
- Message::shared_ptr msg(new Message());
+ intrusive_ptr<Message> msg(new Message());
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
AMQFrame header(0, AMQHeaderBody());
@@ -45,7 +46,7 @@ struct MessageUtils
return msg;
}
- static void addContent(Message::shared_ptr msg, const string& data)
+ static void addContent(intrusive_ptr<Message> msg, const string& data)
{
AMQFrame content(0, AMQContentBody(data));
msg->getFrames().append(content);
diff --git a/cpp/src/tests/QueueTest.cpp b/cpp/src/tests/QueueTest.cpp
index 3235fe2418..2d84d23b6f 100644
--- a/cpp/src/tests/QueueTest.cpp
+++ b/cpp/src/tests/QueueTest.cpp
@@ -38,7 +38,7 @@ class TestConsumer : public virtual Consumer{
public:
typedef shared_ptr<TestConsumer> shared_ptr;
- Message::shared_ptr last;
+ intrusive_ptr<Message> last;
bool received;
TestConsumer(): received(false) {};
@@ -71,8 +71,8 @@ class QueueTest : public CppUnit::TestCase
public:
- Message::shared_ptr message(std::string exchange, std::string routingKey) {
- Message::shared_ptr msg(new Message());
+ intrusive_ptr<Message> message(std::string exchange, std::string routingKey) {
+ intrusive_ptr<Message> msg(new Message());
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, exchange, 0, 0));
AMQFrame header(0, AMQHeaderBody());
msg->getFrames().append(method);
@@ -85,14 +85,14 @@ class QueueTest : public CppUnit::TestCase
void testAsyncMessage(){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- Message::shared_ptr received;
+ intrusive_ptr<Message> received;
TestConsumer::shared_ptr c1(new TestConsumer());
queue->consume(c1);
//Test basic delivery:
- Message::shared_ptr msg1 = message("e", "A");
+ intrusive_ptr<Message> msg1 = message("e", "A");
msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
sleep(2);
@@ -109,7 +109,7 @@ class QueueTest : public CppUnit::TestCase
void testAsyncMessageCount(){
Queue::shared_ptr queue(new Queue("my_test_queue", true));
- Message::shared_ptr msg1 = message("e", "A");
+ intrusive_ptr<Message> msg1 = message("e", "A");
msg1->enqueueAsync();//this is done on enqueue which is not called from process
queue->process(msg1);
@@ -134,9 +134,9 @@ class QueueTest : public CppUnit::TestCase
CPPUNIT_ASSERT_EQUAL(uint32_t(2), queue->getConsumerCount());
//Test basic delivery:
- Message::shared_ptr msg1 = message("e", "A");
- Message::shared_ptr msg2 = message("e", "B");
- Message::shared_ptr msg3 = message("e", "C");
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "B");
+ intrusive_ptr<Message> msg3 = message("e", "C");
queue->deliver(msg1);
if (!c1->received)
@@ -183,10 +183,10 @@ class QueueTest : public CppUnit::TestCase
void testDequeue(){
Queue::shared_ptr queue(new Queue("my_queue", true));
- Message::shared_ptr msg1 = message("e", "A");
- Message::shared_ptr msg2 = message("e", "B");
- Message::shared_ptr msg3 = message("e", "C");
- Message::shared_ptr received;
+ intrusive_ptr<Message> msg1 = message("e", "A");
+ intrusive_ptr<Message> msg2 = message("e", "B");
+ intrusive_ptr<Message> msg3 = message("e", "C");
+ intrusive_ptr<Message> received;
queue->deliver(msg1);
queue->deliver(msg2);
diff --git a/cpp/src/tests/TxAckTest.cpp b/cpp/src/tests/TxAckTest.cpp
index dd57736a0c..495b8b550e 100644
--- a/cpp/src/tests/TxAckTest.cpp
+++ b/cpp/src/tests/TxAckTest.cpp
@@ -29,6 +29,7 @@
using std::list;
using std::vector;
+using namespace qpid;
using namespace qpid::broker;
using namespace qpid::framing;
@@ -58,7 +59,7 @@ class TxAckTest : public CppUnit::TestCase
AccumulatedAck acked;
TestMessageStore store;
Queue::shared_ptr queue;
- vector<Message::shared_ptr> messages;
+ vector<intrusive_ptr<Message> > messages;
list<DeliveryRecord> deliveries;
TxAck op;
@@ -68,7 +69,7 @@ public:
TxAckTest() : acked(0), queue(new Queue("my_queue", false, &store, 0)), op(acked, deliveries)
{
for(int i = 0; i < 10; i++){
- Message::shared_ptr msg(new Message());
+ intrusive_ptr<Message> msg(new Message());
AMQFrame method(0, MessageTransferBody(ProtocolVersion(), 0, "exchange", 0, 0));
AMQFrame header(0, AMQHeaderBody());
msg->getFrames().append(method);
diff --git a/cpp/src/tests/TxPublishTest.cpp b/cpp/src/tests/TxPublishTest.cpp
index 4ec526f207..b969598f1d 100644
--- a/cpp/src/tests/TxPublishTest.cpp
+++ b/cpp/src/tests/TxPublishTest.cpp
@@ -63,7 +63,7 @@ class TxPublishTest : public CppUnit::TestCase
TestMessageStore store;
Queue::shared_ptr queue1;
Queue::shared_ptr queue2;
- Message::shared_ptr msg;
+ intrusive_ptr<Message> msg;
TxPublish op;
public:
@@ -99,7 +99,7 @@ public:
op.prepare(0);
op.commit();
CPPUNIT_ASSERT_EQUAL((uint32_t) 1, queue1->getMessageCount());
- Message::shared_ptr msg_dequeue = queue1->dequeue().payload;
+ intrusive_ptr<Message> msg_dequeue = queue1->dequeue().payload;
CPPUNIT_ASSERT_EQUAL( true, ((PersistableMessage*) msg_dequeue.get())->isEnqueueComplete());
CPPUNIT_ASSERT_EQUAL(msg, msg_dequeue);