diff options
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r-- | qpid/cpp/src/qpid/broker/amqp/Session.cpp | 57 |
1 files changed, 56 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp index 7780456bd1..2f793aea14 100644 --- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp +++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp @@ -206,6 +206,19 @@ class IncomingToExchange : public DecodingIncoming bool isControllingLink; }; +class AnonymousRelay : public DecodingIncoming +{ + public: + AnonymousRelay(Broker& b, Connection& c, Session& p, pn_link_t* l) + : DecodingIncoming(l, b, p, std::string(), "ANONYMOUS-RELAY", pn_link_name(l)), authorise(p.getAuthorise()), context(c) + {} + void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*); + private: + boost::shared_ptr<qpid::broker::Exchange> exchange; + Authorise& authorise; + BrokerContext& context; +}; + class IncomingToCoordinator : public DecodingIncoming { public: @@ -411,7 +424,13 @@ void Session::attach(pn_link_t* link) std::string name; if (pn_terminus_get_type(target) == PN_UNSPECIFIED) { pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED); - throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!"); + authorise.access("ANONYMOUS-RELAY"); + boost::shared_ptr<Incoming> r(new AnonymousRelay(connection.getBroker(), connection, *this, link)); + incoming[link] = r; + if (connection.getBroker().isAuthenticating() && !connection.isLink()) + r->verify(connection.getUserId(), connection.getBroker().getRealm()); + QPID_LOG(debug, "Incoming link attached for ANONYMOUS-RELAY"); + return; } else if (pn_terminus_get_type(target) == PN_COORDINATOR) { QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this); boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this)); @@ -937,6 +956,42 @@ void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::Tx } } +void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction) +{ + // need to retrieve AMQP 1.0 'to' field and resolve it to a queue or exchange + std::string dest = message.getTo(); + QPID_LOG(debug, "AnonymousRelay received message for " << dest); + boost::shared_ptr<qpid::broker::Exchange> exchange; + boost::shared_ptr<qpid::broker::Queue> queue; + boost::shared_ptr<qpid::broker::amqp::Topic> topic; + + queue = context.getBroker().getQueues().find(dest); + if (!queue) { + topic = context.getTopics().get(dest); + if (topic) { + exchange = topic->getExchange(); + } else { + exchange = context.getBroker().getExchanges().find(dest); + } + } + + try { + if (queue) { + authorise.incoming(queue); + queue->deliver(message, transaction); + } else if (exchange) { + authorise.route(exchange, message); + DeliverableMessage deliverable(message, transaction); + exchange->route(deliverable); + } else { + QPID_LOG(info, "AnonymousRelay dropping message for " << dest); + } + } catch (const qpid::SessionException& e) { + throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what()); + } + +} + void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery) { if (message && message->isTypedBody()) { |