/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "SessionContext.h" #include "SenderContext.h" #include "ReceiverContext.h" #include #include "qpid/messaging/Address.h" #include "qpid/messaging/Duration.h" #include "qpid/messaging/exceptions.h" #include "qpid/log/Statement.h" extern "C" { #include } namespace qpid { namespace messaging { namespace amqp { SessionContext::SessionContext(pn_connection_t* connection) : session(pn_session(connection)) {} SessionContext::~SessionContext() { senders.clear(); receivers.clear(); pn_session_free(session); } boost::shared_ptr SessionContext::createSender(const qpid::messaging::Address& address) { std::string name = address.getName(); int count = 1; for (SenderMap::const_iterator i = senders.find(name); i != senders.end(); i = senders.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } boost::shared_ptr s(new SenderContext(session, name, address)); senders[name] = s; return s; } boost::shared_ptr SessionContext::createReceiver(const qpid::messaging::Address& address) { std::string name = address.getName(); int count = 1; for (ReceiverMap::const_iterator i = receivers.find(name); i != receivers.end(); i = receivers.find(name)) { name = (boost::format("%1%_%2%") % address.getName() % ++count).str(); } boost::shared_ptr r(new ReceiverContext(session, name, address)); receivers[name] = r; return r; } boost::shared_ptr SessionContext::getSender(const std::string& name) const { SenderMap::const_iterator i = senders.find(name); if (i == senders.end()) { throw qpid::messaging::KeyError(std::string("No such sender") + name); } else { return i->second; } } boost::shared_ptr SessionContext::getReceiver(const std::string& name) const { ReceiverMap::const_iterator i = receivers.find(name); if (i == receivers.end()) { throw qpid::messaging::KeyError(std::string("No such receiver") + name); } else { return i->second; } } void SessionContext::closeReceiver(const std::string&) { } void SessionContext::closeSender(const std::string&) { } boost::shared_ptr SessionContext::nextReceiver(qpid::messaging::Duration /*timeout*/) { return boost::shared_ptr(); } uint32_t SessionContext::getReceivable() { return 0;//TODO } uint32_t SessionContext::getUnsettledAcks() { return 0;//TODO } qpid::framing::SequenceNumber SessionContext::record(pn_delivery_t* delivery) { qpid::framing::SequenceNumber id = next++; unacked[id] = delivery; QPID_LOG(debug, "Recorded delivery " << id << " -> " << delivery); return id; } void SessionContext::acknowledge(DeliveryMap::iterator begin, DeliveryMap::iterator end) { for (DeliveryMap::iterator i = begin; i != end; ++i) { QPID_LOG(debug, "Setting disposition for delivery " << i->first << " -> " << i->second); pn_delivery_update(i->second, PN_ACCEPTED); pn_delivery_settle(i->second);//TODO: different settlement modes? } unacked.erase(begin, end); } void SessionContext::acknowledge() { QPID_LOG(debug, "acknowledging all " << unacked.size() << " messages"); acknowledge(unacked.begin(), unacked.end()); } void SessionContext::acknowledge(const qpid::framing::SequenceNumber& id, bool cumulative) { DeliveryMap::iterator i = unacked.find(id); if (i != unacked.end()) { acknowledge(cumulative ? unacked.begin() : i, ++i); } } bool SessionContext::settled() { bool result = true; for (SenderMap::iterator i = senders.begin(); i != senders.end(); ++i) { if (!i->second->settled()) result = false; } return result; } }}} // namespace qpid::messaging::amqp