summaryrefslogtreecommitdiff
path: root/qpid/cpp/src/qpid/broker/amqp/Session.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'qpid/cpp/src/qpid/broker/amqp/Session.cpp')
-rw-r--r--qpid/cpp/src/qpid/broker/amqp/Session.cpp57
1 files changed, 56 insertions, 1 deletions
diff --git a/qpid/cpp/src/qpid/broker/amqp/Session.cpp b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
index 7780456bd1..2f793aea14 100644
--- a/qpid/cpp/src/qpid/broker/amqp/Session.cpp
+++ b/qpid/cpp/src/qpid/broker/amqp/Session.cpp
@@ -206,6 +206,19 @@ class IncomingToExchange : public DecodingIncoming
bool isControllingLink;
};
+class AnonymousRelay : public DecodingIncoming
+{
+ public:
+ AnonymousRelay(Broker& b, Connection& c, Session& p, pn_link_t* l)
+ : DecodingIncoming(l, b, p, std::string(), "ANONYMOUS-RELAY", pn_link_name(l)), authorise(p.getAuthorise()), context(c)
+ {}
+ void handle(qpid::broker::Message& m, qpid::broker::TxBuffer*);
+ private:
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ Authorise& authorise;
+ BrokerContext& context;
+};
+
class IncomingToCoordinator : public DecodingIncoming
{
public:
@@ -411,7 +424,13 @@ void Session::attach(pn_link_t* link)
std::string name;
if (pn_terminus_get_type(target) == PN_UNSPECIFIED) {
pn_terminus_set_type(pn_link_target(link), PN_UNSPECIFIED);
- throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, "No target specified!");
+ authorise.access("ANONYMOUS-RELAY");
+ boost::shared_ptr<Incoming> r(new AnonymousRelay(connection.getBroker(), connection, *this, link));
+ incoming[link] = r;
+ if (connection.getBroker().isAuthenticating() && !connection.isLink())
+ r->verify(connection.getUserId(), connection.getBroker().getRealm());
+ QPID_LOG(debug, "Incoming link attached for ANONYMOUS-RELAY");
+ return;
} else if (pn_terminus_get_type(target) == PN_COORDINATOR) {
QPID_LOG(debug, "Received attach request for incoming link to transaction coordinator on " << this);
boost::shared_ptr<Incoming> i(new IncomingToCoordinator(link, connection.getBroker(), *this));
@@ -937,6 +956,42 @@ void IncomingToExchange::handle(qpid::broker::Message& message, qpid::broker::Tx
}
}
+void AnonymousRelay::handle(qpid::broker::Message& message, qpid::broker::TxBuffer* transaction)
+{
+ // need to retrieve AMQP 1.0 'to' field and resolve it to a queue or exchange
+ std::string dest = message.getTo();
+ QPID_LOG(debug, "AnonymousRelay received message for " << dest);
+ boost::shared_ptr<qpid::broker::Exchange> exchange;
+ boost::shared_ptr<qpid::broker::Queue> queue;
+ boost::shared_ptr<qpid::broker::amqp::Topic> topic;
+
+ queue = context.getBroker().getQueues().find(dest);
+ if (!queue) {
+ topic = context.getTopics().get(dest);
+ if (topic) {
+ exchange = topic->getExchange();
+ } else {
+ exchange = context.getBroker().getExchanges().find(dest);
+ }
+ }
+
+ try {
+ if (queue) {
+ authorise.incoming(queue);
+ queue->deliver(message, transaction);
+ } else if (exchange) {
+ authorise.route(exchange, message);
+ DeliverableMessage deliverable(message, transaction);
+ exchange->route(deliverable);
+ } else {
+ QPID_LOG(info, "AnonymousRelay dropping message for " << dest);
+ }
+ } catch (const qpid::SessionException& e) {
+ throw Exception(qpid::amqp::error_conditions::PRECONDITION_FAILED, e.what());
+ }
+
+}
+
void IncomingToCoordinator::deliver(boost::intrusive_ptr<qpid::broker::amqp::Message> message, pn_delivery_t* delivery)
{
if (message && message->isTypedBody()) {