diff options
Diffstat (limited to 'cpp/lib/broker/BrokerChannel.cpp')
| -rw-r--r-- | cpp/lib/broker/BrokerChannel.cpp | 257 | 
1 files changed, 257 insertions, 0 deletions
| diff --git a/cpp/lib/broker/BrokerChannel.cpp b/cpp/lib/broker/BrokerChannel.cpp new file mode 100644 index 0000000000..42e45dd291 --- /dev/null +++ b/cpp/lib/broker/BrokerChannel.cpp @@ -0,0 +1,257 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements.  See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership.  The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License.  You may obtain a copy of the License at + *  + *   http://www.apache.org/licenses/LICENSE-2.0 + *  + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied.  See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +#include <BrokerChannel.h> +#include <QpidError.h> +#include <iostream> +#include <sstream> +#include <assert.h> + +using std::mem_fun_ref; +using std::bind2nd; +using namespace qpid::broker; +using namespace qpid::framing; +using namespace qpid::sys; + + +Channel::Channel(OutputHandler* _out, int _id, u_int32_t _framesize) : +    id(_id),  +    out(_out),  +    currentDeliveryTag(1), +    transactional(false), +    prefetchSize(0), +    prefetchCount(0), +    framesize(_framesize), +    tagGenerator("sgen"), +    store(0), +    messageBuilder(this){ + +    outstanding.reset(); +} + +Channel::~Channel(){ +} + +bool Channel::exists(const string& consumerTag){ +    return consumers.find(consumerTag) != consumers.end(); +} + +void Channel::consume(string& tag, Queue::shared_ptr queue, bool acks, bool exclusive, ConnectionToken* const connection){ +	if(tag.empty()) tag = tagGenerator.generate(); +    ConsumerImpl* c(new ConsumerImpl(this, tag, queue, connection, acks)); +    try{ +        queue->consume(c, exclusive);//may throw exception +        consumers[tag] = c; +    }catch(ExclusiveAccessException& e){ +        delete c; +        throw e; +    } +} + +void Channel::cancel(consumer_iterator i){ +    ConsumerImpl* c = i->second; +    consumers.erase(i); +    if(c){ +        c->cancel(); +        delete c; +    } +} + +void Channel::cancel(const string& tag){ +    consumer_iterator i = consumers.find(tag); +    if(i != consumers.end()){ +        cancel(i); +    } +} + +void Channel::close(){ +    //cancel all consumers +    for(consumer_iterator i = consumers.begin(); i != consumers.end(); i = consumers.begin() ){ +        cancel(i); +    } +    //requeue: +    recover(true); +} + +void Channel::begin(){ +    transactional = true; +} + +void Channel::commit(){ +    TxAck txAck(accumulatedAck, unacked); +    txBuffer.enlist(&txAck); +    if(txBuffer.prepare(store)){ +        txBuffer.commit(); +    } +    accumulatedAck.clear(); +} + +void Channel::rollback(){ +    txBuffer.rollback(); +    accumulatedAck.clear(); +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, Queue::shared_ptr& queue, bool ackExpected){ +    Mutex::ScopedLock locker(deliveryLock); + +    u_int64_t deliveryTag = currentDeliveryTag++; +    if(ackExpected){ +        unacked.push_back(DeliveryRecord(msg, queue, consumerTag, deliveryTag)); +        outstanding.size += msg->contentSize(); +        outstanding.count++; +    } +    //send deliver method, header and content(s) +    msg->deliver(out, id, consumerTag, deliveryTag, framesize); +} + +bool Channel::checkPrefetch(Message::shared_ptr& msg){ +    Mutex::ScopedLock locker(deliveryLock); +    bool countOk = !prefetchCount || prefetchCount > unacked.size(); +    bool sizeOk = !prefetchSize || prefetchSize > msg->contentSize() + outstanding.size || unacked.empty(); +    return countOk && sizeOk; +} + +Channel::ConsumerImpl::ConsumerImpl(Channel* _parent, const string& _tag,  +                                    Queue::shared_ptr _queue,  +                                    ConnectionToken* const _connection, bool ack) : parent(_parent),  +                                                                                    tag(_tag),  +                                                                                    queue(_queue), +                                                                                    connection(_connection), +                                                                                    ackExpected(ack),  +                                                                                    blocked(false){ +} + +bool Channel::ConsumerImpl::deliver(Message::shared_ptr& msg){ +    if(!connection || connection != msg->getPublisher()){//check for no_local +        if(ackExpected && !parent->checkPrefetch(msg)){ +            blocked = true; +        }else{ +            blocked = false; +            parent->deliver(msg, tag, queue, ackExpected); +            return true; +        } +    } +    return false; +} + +void Channel::ConsumerImpl::cancel(){ +    if(queue) queue->cancel(this); +} + +void Channel::ConsumerImpl::requestDispatch(){ +    if(blocked) queue->dispatch(); +} + +void Channel::handlePublish(Message* _message, Exchange::shared_ptr _exchange){ +    Message::shared_ptr message(_message); +    exchange = _exchange; +    messageBuilder.initialise(message); +} + +void Channel::handleHeader(AMQHeaderBody::shared_ptr header){ +    messageBuilder.setHeader(header); +    //at this point, decide based on the size of the message whether we want +    //to stage it by saving content directly to disk as it arrives +} + +void Channel::handleContent(AMQContentBody::shared_ptr content){ +    messageBuilder.addContent(content); +} + +void Channel::complete(Message::shared_ptr& msg){ +    if(exchange){ +        if(transactional){ +            TxPublish* deliverable = new TxPublish(msg); +            exchange->route(*deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); +            txBuffer.enlist(new DeletingTxOp(deliverable)); +        }else{ +            DeliverableMessage deliverable(msg); +            exchange->route(deliverable, msg->getRoutingKey(), &(msg->getHeaderProperties()->getHeaders())); +        } +        exchange.reset(); +    }else{ +        std::cout << "Exchange not known in Channel::complete(Message::shared_ptr&)" << std::endl; +    } +} + +void Channel::ack(u_int64_t deliveryTag, bool multiple){ +    if(transactional){ +        accumulatedAck.update(deliveryTag, multiple); +        //TODO: I think the outstanding prefetch size & count should be updated at this point... +        //TODO: ...this may then necessitate dispatching to consumers +    }else{ +        Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery +     +        ack_iterator i = find_if(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::matches), deliveryTag)); +        if(i == unacked.end()){ +            throw InvalidAckException(); +        }else if(multiple){      +            ack_iterator end = ++i; +            for_each(unacked.begin(), end, bind2nd(mem_fun_ref(&DeliveryRecord::discard), 0)); +            unacked.erase(unacked.begin(), end); + +            //recalculate the prefetch: +            outstanding.reset(); +            for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::addTo), &outstanding)); +        }else{ +            i->discard(); +            i->subtractFrom(&outstanding); +            unacked.erase(i);         +        } + +        //if the prefetch limit had previously been reached, there may +        //be messages that can be now be delivered +        for(consumer_iterator j = consumers.begin(); j != consumers.end(); j++){ +            j->second->requestDispatch(); +        } +    } +} + +void Channel::recover(bool requeue){ +    Mutex::ScopedLock locker(deliveryLock);//need to synchronize with possible concurrent delivery + +    if(requeue){ +        outstanding.reset(); +        std::list<DeliveryRecord> copy = unacked; +        unacked.clear(); +        for_each(copy.begin(), copy.end(), mem_fun_ref(&DeliveryRecord::requeue)); +    }else{ +        for_each(unacked.begin(), unacked.end(), bind2nd(mem_fun_ref(&DeliveryRecord::redeliver), this));         +    } +} + +bool Channel::get(Queue::shared_ptr queue, bool ackExpected){ +    Message::shared_ptr msg = queue->dequeue(); +    if(msg){ +        Mutex::ScopedLock locker(deliveryLock); +        u_int64_t myDeliveryTag = currentDeliveryTag++; +        msg->sendGetOk(out, id, queue->getMessageCount() + 1, myDeliveryTag, framesize); +        if(ackExpected){ +            unacked.push_back(DeliveryRecord(msg, queue, myDeliveryTag)); +        } +        return true; +    }else{ +        return false; +    } +} + +void Channel::deliver(Message::shared_ptr& msg, const string& consumerTag, u_int64_t deliveryTag){ +    msg->deliver(out, id, consumerTag, deliveryTag, framesize); +} | 
