diff options
| author | Ben Hood <0x6e6562@gmail.com> | 2008-09-04 22:32:32 +0100 |
|---|---|---|
| committer | Ben Hood <0x6e6562@gmail.com> | 2008-09-04 22:32:32 +0100 |
| commit | 4391947d60600f8a7428e2d49ca493be49a04c79 (patch) | |
| tree | 3ca67516a59ee6c476490413414b46279668c92c /src | |
| parent | 10c0fc56adb744e59155fb7342d5238703ca4f89 (diff) | |
| download | rabbitmq-server-git-4391947d60600f8a7428e2d49ca493be49a04c79.tar.gz | |
Split out direct and topic routing
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_exchange.erl | 53 |
1 files changed, 24 insertions, 29 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 5142ac9f85..dd9a60a020 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -185,48 +185,43 @@ simple_publish(Mandatory, Immediate, route(#exchange{name = Name, type = topic}, RoutingKey) -> route_internal(Name, RoutingKey, fun topic_matches/2); -% This matches for the direct exchanges and tries to short-cut the routing table -% for the default queue if that is what the user supplied - -% route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) -> -% case route_internal(Name, RoutingKey) of -% [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end); -% Other -> Other -% end; - route(#exchange{name = Name, type = Type}, RoutingKey) -> - route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end). + route_internal(Name, RoutingKey). % This returns a list of QPids to route to. % Maybe this should be handled by a cursor instead. % This routes directly to queues, avoiding any lookup of routes - -% route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) -> -% Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue), -% Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]), -% mnesia:activity(async_dirty, fun() -> qlc:e(Query) end). - +route_internal(#resource{name = Name, virtual_host = VHostPath}, RoutingKey) -> + MatchHead = #route{binding = #binding{exchange_name = '$1', + queue_name = '$2', + key = '$3'}}, + %MatchHead = #person{name='$1', sex=male, age='$2', _='_'}, + Guards = [{'==', '$1', Name}, {'==', '$3', RoutingKey}], + lookup_qpids( + mnesia:activity(async_dirty, + fun() -> mnesia:select(route,[{MatchHead, Guards, ['$2']}]) + end)). + % This returns a list of QPids to route to. % Maybe this should be handled by a cursor instead. route_internal(Exchange, RoutingKey, MatchFun) -> Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Exchange, - MatchFun(BindingKey, RoutingKey)]), - Fun = fun() -> qlc:e(Query) end, - {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]), - Set = sets:from_list(L), - Fun2 = fun() -> + queue_name = QName, + key = BindingKey}} <- mnesia:table(route), + ExchangeName == Exchange, + % This causes a full table scan (see bug 19336) + MatchFun(BindingKey, RoutingKey)]), + lookup_qpids(mnesia:activity(async_dirty, fun() -> qlc:e(Query) end)). + +lookup_qpids(Queues) -> + Set = sets:from_list(Queues), + Fun = fun() -> sets:fold( fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}), [QPid] ++ Acc end, [], Set) end, - {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]), - io:format("Time 1 -> ~p~n",[Time1]), - io:format("Time 2 -> ~p~n",[Time2]), - QPids. - + mnesia:activity(async_dirty,Fun). + % Should all of the route and binding management not be refactored to it's own module % Especially seeing as unbind will have to be implemented for 0.91 ? delete_routes(Q = #amqqueue{name = Name}) -> |
