summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_exchange.erl45
1 files changed, 26 insertions, 19 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 13fed093f4..5142ac9f85 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -187,11 +187,12 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
% This matches for the direct exchanges and tries to short-cut the routing table
% for the default queue if that is what the user supplied
-route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) ->
- case route_internal(Name, RoutingKey) of
- [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end);
- Other -> Other
- end;
+
+% route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) ->
+% case route_internal(Name, RoutingKey) of
+% [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end);
+% Other -> Other
+% end;
route(#exchange{name = Name, type = Type}, RoutingKey) ->
route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end).
@@ -199,26 +200,32 @@ route(#exchange{name = Name, type = Type}, RoutingKey) ->
% This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
% This routes directly to queues, avoiding any lookup of routes
-route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) ->
- Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
- Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]),
- mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
+
+% route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) ->
+% Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
+% Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]),
+% mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
% This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
route_internal(Exchange, RoutingKey, MatchFun) ->
- Query = qlc:q([QPid || #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = QueueName,
+ Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = QName,
key = BindingKey}} <- mnesia:table(route),
- #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
ExchangeName == Exchange,
- QueueName == Queue,
- MatchFun(BindingKey, RoutingKey)],
- % This is put in as a quick and dirty way to preventing double routing
- % but it is probably very expensive to maintain
- {unique, true}),
- mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
-
+ MatchFun(BindingKey, RoutingKey)]),
+ Fun = fun() -> qlc:e(Query) end,
+ {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]),
+ Set = sets:from_list(L),
+ Fun2 = fun() ->
+ sets:fold(
+ fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}),
+ [QPid] ++ Acc end,
+ [], Set) end,
+ {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]),
+ io:format("Time 1 -> ~p~n",[Time1]),
+ io:format("Time 2 -> ~p~n",[Time2]),
+ QPids.
% Should all of the route and binding management not be refactored to it's own module
% Especially seeing as unbind will have to be implemented for 0.91 ?