/* * * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "Session.h" #include "Outgoing.h" #include "Message.h" #include "ManagedConnection.h" #include "qpid/broker/AsyncCompletion.h" #include "qpid/broker/Broker.h" #include "qpid/broker/DeliverableMessage.h" #include "qpid/broker/Exchange.h" #include "qpid/broker/DirectExchange.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/FanOutExchange.h" #include "qpid/broker/Message.h" #include "qpid/broker/Queue.h" #include "qpid/broker/TopicExchange.h" #include "qpid/broker/amqp/Filter.h" #include "qpid/broker/amqp/NodeProperties.h" #include "qpid/framing/AMQFrame.h" #include "qpid/framing/FieldTable.h" #include "qpid/framing/MessageTransferBody.h" #include "qpid/log/Statement.h" #include #include #include #include extern "C" { #include } namespace qpid { namespace broker { namespace amqp { class Target { public: Target(pn_link_t* l) : credit(100), window(0), link(l) {} virtual ~Target() {} bool flow(); bool needFlow(); virtual void handle(qpid::broker::Message& m) = 0;//TODO: revise this for proper message protected: const uint32_t credit; uint32_t window; pn_link_t* link; }; class Queue : public Target { public: Queue(boost::shared_ptr q, pn_link_t* l) : Target(l), queue(q) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr queue; }; class Exchange : public Target { public: Exchange(boost::shared_ptr e, pn_link_t* l) : Target(l), exchange(e) {} void handle(qpid::broker::Message& m); private: boost::shared_ptr exchange; }; Session::Session(pn_session_t* s, qpid::broker::Broker& b, ManagedConnection& c, qpid::sys::OutputControl& o) : ManagedSession(b, c, (boost::format("%1%") % s).str()), session(s), broker(b), connection(c), out(o), deleted(false) {} Session::ResolvedNode Session::resolve(const std::string name, pn_terminus_t* terminus) { ResolvedNode node; node.exchange = broker.getExchanges().find(name); node.queue = broker.getQueues().find(name); if (!node.queue && !node.exchange && pn_terminus_is_dynamic(terminus)) { //TODO: handle dynamic creation //is it a queue or an exchange? NodeProperties properties; properties.read(pn_terminus_properties(terminus)); if (properties.isQueue()) { node.queue = broker.createQueue(name, properties.getQueueSettings(), this, properties.getAlternateExchange(), connection.getUserid(), connection.getId()).first; } else { qpid::framing::FieldTable args; node.exchange = broker.createExchange(name, properties.getExchangeType(), properties.isDurable(), properties.getAlternateExchange(), args, connection.getUserid(), connection.getId()).first; } } else if (node.queue && node.exchange) { QPID_LOG_CAT(warning, protocol, "Ambiguous node name; " << name << " could be queue or exchange, assuming queue"); node.exchange.reset(); } return node; } void Session::attach(pn_link_t* link) { if (pn_link_is_sender(link)) { pn_terminus_t* source = pn_link_remote_source(link); //i.e a subscription if (pn_terminus_get_type(source) == PN_UNSPECIFIED) { throw qpid::Exception("No source specified!");/*invalid-field?*/ } std::string name = pn_terminus_get_address(source); QPID_LOG(debug, "Received attach request for outgoing link from " << name); pn_terminus_set_address(pn_link_source(link), name.c_str()); ResolvedNode node = resolve(name, source); Filter filter; filter.read(pn_terminus_filter(source)); if (node.queue) { boost::shared_ptr q(new Outgoing(broker, node.queue, link, *this, out, false)); q->init(); if (filter.hasSubjectFilter()) { q->setSubjectFilter(filter.getSubjectFilter()); } senders[link] = q; } else if (node.exchange) { QueueSettings settings(false, true); //TODO: populate settings from source details when available from engine boost::shared_ptr queue = broker.createQueue(name + qpid::types::Uuid(true).str(), settings, this, "", connection.getUserid(), connection.getId()).first; if (filter.hasSubjectFilter()) { filter.bind(node.exchange, queue); filter.write(pn_terminus_filter(pn_link_source(link))); } else if (node.exchange->getType() == FanOutExchange::typeName) { node.exchange->bind(queue, std::string(), 0); } else if (node.exchange->getType() == TopicExchange::typeName) { node.exchange->bind(queue, "#", 0); } else { throw qpid::Exception("Exchange type requires a filter: " + node.exchange->getType());/*not-supported?*/ } boost::shared_ptr q(new Outgoing(broker, queue, link, *this, out, true)); senders[link] = q; q->init(); } else { pn_terminus_set_type(pn_link_source(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } QPID_LOG(debug, "Outgoing link attached"); } else { pn_terminus_t* target = pn_link_remote_target(link); if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { throw qpid::Exception("No target specified!");/*invalid field?*/ } std::string name = pn_terminus_get_address(target); QPID_LOG(debug, "Received attach request for incoming link to " << name); pn_terminus_set_address(pn_link_target(link), name.c_str()); ResolvedNode node = resolve(name, target); if (node.queue) { boost::shared_ptr q(new Queue(node.queue, link)); targets[link] = q; } else if (node.exchange) { boost::shared_ptr e(new Exchange(node.exchange, link)); targets[link] = e; } else { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); throw qpid::Exception("Node not found: " + name);/*not-found*/ } QPID_LOG(debug, "Incoming link attached"); } } void Session::detach(pn_link_t* link) { if (pn_link_is_sender(link)) { Senders::iterator i = senders.find(link); if (i != senders.end()) { i->second->detached(); senders.erase(i); QPID_LOG(debug, "Outgoing link detached"); } } else { targets.erase(link); QPID_LOG(debug, "Incoming link detached"); } } namespace { class Transfer : public qpid::broker::AsyncCompletion::Callback { public: Transfer(pn_delivery_t* d, boost::shared_ptr s) : delivery(d), session(s) {} void completed(bool sync) { session->accepted(delivery, sync); } boost::intrusive_ptr clone() { boost::intrusive_ptr copy(new Transfer(delivery, session)); return copy; } private: pn_delivery_t* delivery; boost::shared_ptr session; }; } void Session::accepted(pn_delivery_t* delivery, bool sync) { if (sync) { //this is on IO thread pn_delivery_update(delivery, PN_ACCEPTED); pn_delivery_settle(delivery);//do we need to check settlement modes/orders? incomingMessageAccepted(); } else { //this is not on IO thread, need to delay processing until on IO thread qpid::sys::Mutex::ScopedLock l(lock); if (!deleted) { completed.push_back(delivery); out.activateOutput(); } } } void Session::incoming(pn_link_t* link, pn_delivery_t* delivery) { pn_delivery_tag_t tag = pn_delivery_tag(delivery); QPID_LOG(debug, "received delivery: " << std::string(tag.bytes, tag.size)); boost::intrusive_ptr received(new Message(pn_delivery_pending(delivery))); /*ssize_t read = */pn_link_recv(link, received->getData(), received->getSize()); received->scan(); pn_link_advance(link); qpid::broker::Message message(received, received); incomingMessageReceived(); Targets::iterator target = targets.find(link); if (target == targets.end()) { QPID_LOG(error, "Received message on unknown link"); pn_delivery_update(delivery, PN_REJECTED); pn_delivery_settle(delivery);//do we need to check settlement modes/orders? incomingMessageRejected(); } else { target->second->handle(message); received->begin(); Transfer t(delivery, shared_from_this()); received->end(t); if (target->second->needFlow()) out.activateOutput(); } } void Session::outgoing(pn_link_t* link, pn_delivery_t* delivery) { Senders::iterator sender = senders.find(link); if (sender == senders.end()) { QPID_LOG(error, "Delivery returned for unknown link"); } else { sender->second->handle(delivery); } } bool Session::dispatch() { bool output(false); for (Senders::iterator s = senders.begin(); s != senders.end(); ++s) { if (s->second->dispatch()) output = true; } if (completed.size()) { output = true; std::deque copy; { qpid::sys::Mutex::ScopedLock l(lock); completed.swap(copy); } for (std::deque::iterator i = copy.begin(); i != copy.end(); ++i) { accepted(*i, true); } } for (Targets::iterator t = targets.begin(); t != targets.end(); ++t) { if (t->second->flow()) output = true; } return output; } void Session::close() { for (Senders::iterator i = senders.begin(); i != senders.end(); ++i) { i->second->detached(); } senders.clear(); targets.clear();//at present no explicit cleanup required for targets QPID_LOG(debug, "Session closed, all senders cancelled."); qpid::sys::Mutex::ScopedLock l(lock); deleted = true; } void Queue::handle(qpid::broker::Message& message) { queue->deliver(message); --window; } void Exchange::handle(qpid::broker::Message& message) { DeliverableMessage deliverable(message, 0); exchange->route(deliverable); --window; } bool Target::flow() { bool issue = window < credit; if (issue) { pn_link_flow(link, credit - window);//TODO: proper flow control window = credit; } return issue; } bool Target::needFlow() { return window <= (credit/2); } }}} // namespace qpid::broker::amqp