diff options
| -rw-r--r-- | src/rabbit_exchange.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 21 |
2 files changed, 17 insertions, 16 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 62c1417962..db5a46bd0a 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -229,8 +229,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end). publish(X = #exchange{name = XName}, Delivery) -> QNames = find_qnames(Delivery, queue:from_list([X]), sets:from_list([XName]), []), - QPids = lookup_qpids(QNames), - rabbit_router:deliver(QPids, Delivery). + rabbit_router:deliver(QNames, Delivery). find_qnames(Delivery, WorkList, SeenXs, QNames) -> case queue:out(WorkList) of @@ -270,15 +269,6 @@ process_alternate(#exchange{name = XName, arguments = Args}, []) -> process_alternate(_X, Results) -> Results. -lookup_qpids(QNames) -> - lists:foldl( - fun (Key, Acc) -> - case mnesia:dirty_read({rabbit_queue, Key}) of - [#amqqueue{pid = QPid}] -> [QPid | Acc]; - [] -> Acc - end - end, [], QNames). - call_with_exchange(XName, Fun) -> rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:read({rabbit_exchange, XName}) of diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 6f91633ecb..ebe281621c 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -46,8 +46,8 @@ -type(qpids() :: [pid()]). -type(match_result() :: [rabbit_types:binding_destination()]). --spec(deliver/2 :: - (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}). +-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) -> + {routing_result(), qpids()}). -spec(match_bindings/2 :: (rabbit_types:binding_source(), fun ((rabbit_types:binding()) -> boolean())) -> match_result()). @@ -58,8 +58,8 @@ %%---------------------------------------------------------------------------- -deliver(QPids, Delivery = #delivery{mandatory = false, - immediate = false}) -> +deliver(QNames, Delivery = #delivery{mandatory = false, + immediate = false}) -> %% optimisation: when Mandatory = false and Immediate = false, %% rabbit_amqqueue:deliver will deliver the message to the queue %% process asynchronously, and return true, which means all the @@ -67,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false, %% fire-and-forget cast here and return the QPids - the semantics %% is preserved. This scales much better than the non-immediate %% case below. + QPids = lookup_qpids(QNames), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end), {routed, QPids}; -deliver(QPids, Delivery) -> +deliver(QNames, Delivery) -> + QPids = lookup_qpids(QNames), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -82,6 +84,15 @@ deliver(QPids, Delivery) -> check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate, {Routed, Handled}). +lookup_qpids(QNames) -> + lists:foldl( + fun (Key, Acc) -> + case mnesia:dirty_read({rabbit_queue, Key}) of + [#amqqueue{pid = QPid}] -> [QPid | Acc]; + [] -> Acc + end + end, [], QNames). + %% TODO: Maybe this should be handled by a cursor instead. %% TODO: This causes a full scan for each entry with the same source match_bindings(SrcName, Match) -> |
