summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-09-30 17:22:47 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-09-30 17:22:47 +0100
commita02b82f0fdb1acdfc74d83e8f38c78587748a962 (patch)
tree4e17dee7db6d12fa1f499fdf3dad0c4128f8221a
parent8fbb9ed3f1a354ecefe431e97e4f8df223bce304 (diff)
downloadrabbitmq-server-git-a02b82f0fdb1acdfc74d83e8f38c78587748a962.tar.gz
Move lookup_qpids into router and associated changes to deliver API
-rw-r--r--src/rabbit_exchange.erl12
-rw-r--r--src/rabbit_router.erl21
2 files changed, 17 insertions, 16 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 62c1417962..db5a46bd0a 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -229,8 +229,7 @@ info_all(VHostPath, Items) -> map(VHostPath, fun (X) -> info(X, Items) end).
publish(X = #exchange{name = XName}, Delivery) ->
QNames = find_qnames(Delivery, queue:from_list([X]),
sets:from_list([XName]), []),
- QPids = lookup_qpids(QNames),
- rabbit_router:deliver(QPids, Delivery).
+ rabbit_router:deliver(QNames, Delivery).
find_qnames(Delivery, WorkList, SeenXs, QNames) ->
case queue:out(WorkList) of
@@ -270,15 +269,6 @@ process_alternate(#exchange{name = XName, arguments = Args}, []) ->
process_alternate(_X, Results) ->
Results.
-lookup_qpids(QNames) ->
- lists:foldl(
- fun (Key, Acc) ->
- case mnesia:dirty_read({rabbit_queue, Key}) of
- [#amqqueue{pid = QPid}] -> [QPid | Acc];
- [] -> Acc
- end
- end, [], QNames).
-
call_with_exchange(XName, Fun) ->
rabbit_misc:execute_mnesia_transaction(
fun () -> case mnesia:read({rabbit_exchange, XName}) of
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 6f91633ecb..ebe281621c 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -46,8 +46,8 @@
-type(qpids() :: [pid()]).
-type(match_result() :: [rabbit_types:binding_destination()]).
--spec(deliver/2 ::
- (qpids(), rabbit_types:delivery()) -> {routing_result(), qpids()}).
+-spec(deliver/2 :: ([rabbit_amqqueue:name()], rabbit_types:delivery()) ->
+ {routing_result(), qpids()}).
-spec(match_bindings/2 :: (rabbit_types:binding_source(),
fun ((rabbit_types:binding()) -> boolean())) ->
match_result()).
@@ -58,8 +58,8 @@
%%----------------------------------------------------------------------------
-deliver(QPids, Delivery = #delivery{mandatory = false,
- immediate = false}) ->
+deliver(QNames, Delivery = #delivery{mandatory = false,
+ immediate = false}) ->
%% optimisation: when Mandatory = false and Immediate = false,
%% rabbit_amqqueue:deliver will deliver the message to the queue
%% process asynchronously, and return true, which means all the
@@ -67,11 +67,13 @@ deliver(QPids, Delivery = #delivery{mandatory = false,
%% fire-and-forget cast here and return the QPids - the semantics
%% is preserved. This scales much better than the non-immediate
%% case below.
+ QPids = lookup_qpids(QNames),
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, Delivery) end),
{routed, QPids};
-deliver(QPids, Delivery) ->
+deliver(QNames, Delivery) ->
+ QPids = lookup_qpids(QNames),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -82,6 +84,15 @@ deliver(QPids, Delivery) ->
check_delivery(Delivery#delivery.mandatory, Delivery#delivery.immediate,
{Routed, Handled}).
+lookup_qpids(QNames) ->
+ lists:foldl(
+ fun (Key, Acc) ->
+ case mnesia:dirty_read({rabbit_queue, Key}) of
+ [#amqqueue{pid = QPid}] -> [QPid | Acc];
+ [] -> Acc
+ end
+ end, [], QNames).
+
%% TODO: Maybe this should be handled by a cursor instead.
%% TODO: This causes a full scan for each entry with the same source
match_bindings(SrcName, Match) ->