summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_channel.erl34
1 files changed, 31 insertions, 3 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index b2716ec478..a7e3f2bb0a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -317,7 +317,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
true -> rabbit_guid:guid();
false -> none
end,
- {noreply, publish(Mandatory, Immediate,
+ {noreply, publish(Exchange, Mandatory, Immediate,
#basic_message{exchange_name = ExchangeName,
routing_key = RoutingKey,
content = DecodedContent,
@@ -767,15 +767,43 @@ binding_action(Fun, ExchangeNameBin, QueueNameBin, RoutingKey, Arguments,
ok -> return_ok(State, NoWait, ReturnMethod)
end.
-publish(Mandatory, Immediate, Message, QPids,
+publish(X, Mandatory, Immediate, Message, QPids,
State = #ch{transaction_id = TxnKey, writer_pid = WriterPid}) ->
- Handled = deliver(QPids, Mandatory, Immediate, TxnKey,
+ Handled = deliver(X, QPids, Mandatory, Immediate, TxnKey,
Message, WriterPid),
case TxnKey of
none -> State;
_ -> add_tx_participants(Handled, State)
end.
+deliver(X, QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
+ case deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) of
+ [] ->
+ case lists:keysearch(<<"ume">>, 1, X#exchange.arguments) of
+ {value, {_, longstr, UmeNameBin}} ->
+ XName = X#exchange.name,
+ UmeName = rabbit_misc:r(XName, exchange, UmeNameBin),
+ case rabbit_exchange:lookup(UmeName) of
+ {ok, Ume} ->
+ #basic_message{routing_key = RK, content = C} =
+ Message,
+ deliver(Ume, rabbit_exchange:route(Ume, RK, C),
+ false, false, Txn, Message, WriterPid);
+ {error, not_found} ->
+ rabbit_log:warning(
+ "unroutable message exchange for ~s "
+ "does not exist: ~s",
+ [rabbit_misc:rs(XName),
+ rabbit_misc:rs(UmeName)]),
+ []
+ end;
+ false ->
+ []
+ end;
+ Handled ->
+ Handled
+ end.
+
deliver(QPids, Mandatory, Immediate, Txn, Message, WriterPid) ->
case rabbit_router:deliver(QPids, Mandatory, Immediate, Txn, Message) of
{ok, DeliveredQPids} -> DeliveredQPids;