summaryrefslogtreecommitdiff
path: root/cpp/src/qpid/broker/BrokerChannel.cpp
diff options
context:
space:
mode:
authorGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
committerGordon Sim <gsim@apache.org>2007-07-27 15:44:52 +0000
commit80406d0fb680239a0141b81fb0b9f20d20c9b1e1 (patch)
tree13677bf773bf25db03144aa72c97a49d2810240d /cpp/src/qpid/broker/BrokerChannel.cpp
parenta9232d5a02a19f093f212cb0b76772a20b45cb1b (diff)
downloadqpid-python-80406d0fb680239a0141b81fb0b9f20d20c9b1e1.tar.gz
Use execution layer to acknowledge messages.
Turn off 0-9 framing of requests and responses. Some refactoring around message delivery. git-svn-id: https://svn.apache.org/repos/asf/incubator/qpid/trunk/qpid@560285 13f79535-47bb-0310-9956-ffa450edef68
Diffstat (limited to 'cpp/src/qpid/broker/BrokerChannel.cpp')
-rw-r--r--cpp/src/qpid/broker/BrokerChannel.cpp86
1 files changed, 55 insertions, 31 deletions
diff --git a/cpp/src/qpid/broker/BrokerChannel.cpp b/cpp/src/qpid/broker/BrokerChannel.cpp
index a598717c5d..c50fbd5559 100644
--- a/cpp/src/qpid/broker/BrokerChannel.cpp
+++ b/cpp/src/qpid/broker/BrokerChannel.cpp
@@ -49,9 +49,10 @@ using namespace qpid::framing;
using namespace qpid::sys;
-Channel::Channel(Connection& con, ChannelId _id, MessageStore* const _store) :
+Channel::Channel(Connection& con, DeliveryAdapter& _out, ChannelId _id, MessageStore* const _store) :
id(_id),
connection(con),
+ out(_out),
currentDeliveryTag(1),
prefetchSize(0),
prefetchCount(0),
@@ -76,7 +77,7 @@ bool Channel::exists(const string& consumerTag){
// TODO aconway 2007-02-12: Why is connection token passed in instead
// of using the channel's parent connection?
-void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
+void Channel::consume(DeliveryToken::shared_ptr token, string& tagInOut,
Queue::shared_ptr queue, bool acks,
bool exclusive, ConnectionToken* const connection,
const FieldTable*)
@@ -84,7 +85,7 @@ void Channel::consume(std::auto_ptr<DeliveryAdapter> adapter, string& tagInOut,
if(tagInOut.empty())
tagInOut = tagGenerator.generate();
std::auto_ptr<ConsumerImpl> c(
- new ConsumerImpl(this, adapter, tagInOut, queue, connection, acks));
+ new ConsumerImpl(this, token, tagInOut, queue, connection, acks));
queue->consume(c.get(), exclusive);//may throw exception
consumers.insert(tagInOut, c.release());
}
@@ -97,7 +98,8 @@ void Channel::cancel(const string& tag){
consumers.erase(i);
}
-void Channel::close(){
+void Channel::close()
+{
opened = false;
consumers.clear();
if (dtxBuffer.get()) {
@@ -106,11 +108,15 @@ void Channel::close(){
recover(true);
}
-void Channel::startTx(){
+void Channel::startTx()
+{
txBuffer = TxBuffer::shared_ptr(new TxBuffer());
}
-void Channel::commit(){
+void Channel::commit()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
TxOp::shared_ptr txAck(new TxAck(accumulatedAck, unacked));
txBuffer->enlist(txAck);
if (txBuffer->commitLocal(store)) {
@@ -118,16 +124,21 @@ void Channel::commit(){
}
}
-void Channel::rollback(){
+void Channel::rollback()
+{
+ if (!txBuffer) throw ConnectionException(503, "Channel has not been selected for use with transactions");
+
txBuffer->rollback();
accumulatedAck.clear();
}
-void Channel::selectDtx(){
+void Channel::selectDtx()
+{
dtxSelected = true;
}
-void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
+void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join)
+{
if (!dtxSelected) {
throw ConnectionException(503, "Channel has not been selected for use with dtx");
}
@@ -140,7 +151,8 @@ void Channel::startDtx(const std::string& xid, DtxManager& mgr, bool join){
}
}
-void Channel::endDtx(const std::string& xid, bool fail){
+void Channel::endDtx(const std::string& xid, bool fail)
+{
if (!dtxBuffer) {
throw ConnectionException(503, boost::format("xid %1% not associated with this channel") % xid);
}
@@ -160,7 +172,8 @@ void Channel::endDtx(const std::string& xid, bool fail){
dtxBuffer.reset();
}
-void Channel::suspendDtx(const std::string& xid){
+void Channel::suspendDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on suspend")
% dtxBuffer->getXid() % xid);
@@ -171,7 +184,8 @@ void Channel::suspendDtx(const std::string& xid){
dtxBuffer->setSuspended(true);
}
-void Channel::resumeDtx(const std::string& xid){
+void Channel::resumeDtx(const std::string& xid)
+{
if (dtxBuffer->getXid() != xid) {
throw ConnectionException(503, boost::format("xid specified on start was %1%, but %2% specified on resume")
% dtxBuffer->getXid() % xid);
@@ -199,20 +213,22 @@ void Channel::record(const DeliveryRecord& delivery)
delivery.addTo(&outstanding);
}
-bool Channel::checkPrefetch(Message::shared_ptr& msg){
+bool Channel::checkPrefetch(Message::shared_ptr& msg)
+{
Mutex::ScopedLock locker(deliveryLock);
bool countOk = !prefetchCount || prefetchCount > unacked.size();
bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty();
return countOk && sizeOk;
}
-Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, std::auto_ptr<DeliveryAdapter> _adapter,
+Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, DeliveryToken::shared_ptr _token,
const string& _tag, Queue::shared_ptr _queue,
ConnectionToken* const _connection, bool ack
- ) : parent(_parent), adapter(_adapter), tag(_tag), queue(_queue), connection(_connection),
+ ) : parent(_parent), token(_token), tag(_tag), queue(_queue), connection(_connection),
ackExpected(ack), blocked(false) {}
-bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
+bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg)
+{
if(!connection || connection != msg->getPublisher()){//check for no_local
if(!parent->flowActive || (ackExpected && !parent->checkPrefetch(msg))){
blocked = true;
@@ -220,11 +236,10 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
blocked = false;
Mutex::ScopedLock locker(parent->deliveryLock);
- uint64_t deliveryTag = adapter->getNextDeliveryTag();
+ uint64_t deliveryTag = parent->out.deliver(msg, token);
if(ackExpected){
parent->record(DeliveryRecord(msg, queue, tag, deliveryTag));
}
- adapter->deliver(msg, deliveryTag);
return true;
}
@@ -234,14 +249,15 @@ bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){
void Channel::ConsumerImpl::redeliver(Message::shared_ptr& msg, uint64_t deliveryTag) {
Mutex::ScopedLock locker(parent->deliveryLock);
- adapter->deliver(msg, deliveryTag);
+ parent->out.redeliver(msg, token, deliveryTag);
}
Channel::ConsumerImpl::~ConsumerImpl() {
cancel();
}
-void Channel::ConsumerImpl::cancel(){
+void Channel::ConsumerImpl::cancel()
+{
if(queue) {
queue->cancel(this);
if (queue->canAutoDelete()) {
@@ -251,27 +267,32 @@ void Channel::ConsumerImpl::cancel(){
}
}
-void Channel::ConsumerImpl::requestDispatch(){
+void Channel::ConsumerImpl::requestDispatch()
+{
if(blocked)
queue->requestDispatch();
}
-void Channel::handleInlineTransfer(Message::shared_ptr msg){
+void Channel::handleInlineTransfer(Message::shared_ptr msg)
+{
complete(msg);
}
-void Channel::handlePublish(Message* _message){
+void Channel::handlePublish(Message* _message)
+{
Message::shared_ptr message(_message);
messageBuilder.initialise(message);
}
-void Channel::handleHeader(AMQHeaderBody::shared_ptr header){
+void Channel::handleHeader(AMQHeaderBody::shared_ptr header)
+{
messageBuilder.setHeader(header);
//at this point, decide based on the size of the message whether we want
//to stage it by saving content directly to disk as it arrives
}
-void Channel::handleContent(AMQContentBody::shared_ptr content){
+void Channel::handleContent(AMQContentBody::shared_ptr content)
+{
messageBuilder.addContent(content);
}
@@ -306,14 +327,16 @@ void Channel::route(Message::shared_ptr msg, Deliverable& strategy) {
}
// Used by Basic
-void Channel::ack(uint64_t deliveryTag, bool multiple){
+void Channel::ack(uint64_t deliveryTag, bool multiple)
+{
if (multiple)
ack(0, deliveryTag);
else
ack(deliveryTag, deliveryTag);
}
-void Channel::ack(uint64_t firstTag, uint64_t lastTag){
+void Channel::ack(uint64_t firstTag, uint64_t lastTag)
+{
if (txBuffer.get()) {
accumulatedAck.update(firstTag, lastTag);
//TODO: I think the outstanding prefetch size & count should be updated at this point...
@@ -355,7 +378,8 @@ void Channel::ack(uint64_t firstTag, uint64_t lastTag){
}
}
-void Channel::recover(bool requeue){
+void Channel::recover(bool requeue)
+{
Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery
if(requeue){
@@ -368,12 +392,12 @@ void Channel::recover(bool requeue){
}
}
-bool Channel::get(DeliveryAdapter& adapter, Queue::shared_ptr queue, bool ackExpected){
+bool Channel::get(DeliveryToken::shared_ptr token, Queue::shared_ptr queue, bool ackExpected)
+{
Message::shared_ptr msg = queue->dequeue();
if(msg){
Mutex::ScopedLock locker(deliveryLock);
- uint64_t myDeliveryTag = adapter.getNextDeliveryTag();
- adapter.deliver(msg, myDeliveryTag);
+ uint64_t myDeliveryTag = out.deliver(msg, token);
if(ackExpected){
unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag));
}