diff options
| -rw-r--r-- | src/rabbit_exchange.erl | 45 |
1 files changed, 26 insertions, 19 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 13fed093f4..5142ac9f85 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -187,11 +187,12 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> % This matches for the direct exchanges and tries to short-cut the routing table % for the default queue if that is what the user supplied -route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) -> - case route_internal(Name, RoutingKey) of - [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end); - Other -> Other - end; + +% route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) -> +% case route_internal(Name, RoutingKey) of +% [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end); +% Other -> Other +% end; route(#exchange{name = Name, type = Type}, RoutingKey) -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end). @@ -199,26 +200,32 @@ route(#exchange{name = Name, type = Type}, RoutingKey) -> % This returns a list of QPids to route to. % Maybe this should be handled by a cursor instead. % This routes directly to queues, avoiding any lookup of routes -route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) -> - Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue), - Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]), - mnesia:activity(async_dirty, fun() -> qlc:e(Query) end). + +% route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) -> +% Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue), +% Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]), +% mnesia:activity(async_dirty, fun() -> qlc:e(Query) end). % This returns a list of QPids to route to. % Maybe this should be handled by a cursor instead. route_internal(Exchange, RoutingKey, MatchFun) -> - Query = qlc:q([QPid || #route{binding = #binding{exchange_name = ExchangeName, - queue_name = QueueName, + Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QName, key = BindingKey}} <- mnesia:table(route), - #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue), ExchangeName == Exchange, - QueueName == Queue, - MatchFun(BindingKey, RoutingKey)], - % This is put in as a quick and dirty way to preventing double routing - % but it is probably very expensive to maintain - {unique, true}), - mnesia:activity(async_dirty, fun() -> qlc:e(Query) end). - + MatchFun(BindingKey, RoutingKey)]), + Fun = fun() -> qlc:e(Query) end, + {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]), + Set = sets:from_list(L), + Fun2 = fun() -> + sets:fold( + fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}), + [QPid] ++ Acc end, + [], Set) end, + {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]), + io:format("Time 1 -> ~p~n",[Time1]), + io:format("Time 2 -> ~p~n",[Time2]), + QPids. % Should all of the route and binding management not be refactored to it's own module % Especially seeing as unbind will have to be implemented for 0.91 ? |
