summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorBen Hood <0x6e6562@gmail.com>2008-09-04 22:32:32 +0100
committerBen Hood <0x6e6562@gmail.com>2008-09-04 22:32:32 +0100
commit4391947d60600f8a7428e2d49ca493be49a04c79 (patch)
tree3ca67516a59ee6c476490413414b46279668c92c /src
parent10c0fc56adb744e59155fb7342d5238703ca4f89 (diff)
downloadrabbitmq-server-git-4391947d60600f8a7428e2d49ca493be49a04c79.tar.gz
Split out direct and topic routing
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_exchange.erl53
1 files changed, 24 insertions, 29 deletions
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 5142ac9f85..dd9a60a020 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -185,48 +185,43 @@ simple_publish(Mandatory, Immediate,
route(#exchange{name = Name, type = topic}, RoutingKey) ->
route_internal(Name, RoutingKey, fun topic_matches/2);
-% This matches for the direct exchanges and tries to short-cut the routing table
-% for the default queue if that is what the user supplied
-
-% route(#exchange{name = Name = #resource{name = <<>>}, type = direct}, RoutingKey) ->
-% case route_internal(Name, RoutingKey) of
-% [] -> route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end);
-% Other -> Other
-% end;
-
route(#exchange{name = Name, type = Type}, RoutingKey) ->
- route_internal(Name, RoutingKey, fun(X,Y) -> X == Y end).
+ route_internal(Name, RoutingKey).
% This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
% This routes directly to queues, avoiding any lookup of routes
-
-% route_internal(#resource{name = <<>>, virtual_host = VHostPath}, RoutingKey) ->
-% Query = qlc:q([QPid || #amqqueue{name = Queue, pid = QPid} <- mnesia:table(amqqueue),
-% Queue == rabbit_misc:r(VHostPath, queue, RoutingKey)]),
-% mnesia:activity(async_dirty, fun() -> qlc:e(Query) end).
-
+route_internal(#resource{name = Name, virtual_host = VHostPath}, RoutingKey) ->
+ MatchHead = #route{binding = #binding{exchange_name = '$1',
+ queue_name = '$2',
+ key = '$3'}},
+ %MatchHead = #person{name='$1', sex=male, age='$2', _='_'},
+ Guards = [{'==', '$1', Name}, {'==', '$3', RoutingKey}],
+ lookup_qpids(
+ mnesia:activity(async_dirty,
+ fun() -> mnesia:select(route,[{MatchHead, Guards, ['$2']}])
+ end)).
+
% This returns a list of QPids to route to.
% Maybe this should be handled by a cursor instead.
route_internal(Exchange, RoutingKey, MatchFun) ->
Query = qlc:q([QName || #route{binding = #binding{exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Exchange,
- MatchFun(BindingKey, RoutingKey)]),
- Fun = fun() -> qlc:e(Query) end,
- {Time1,L} = timer:tc(mnesia,activity,[async_dirty,Fun]),
- Set = sets:from_list(L),
- Fun2 = fun() ->
+ queue_name = QName,
+ key = BindingKey}} <- mnesia:table(route),
+ ExchangeName == Exchange,
+ % This causes a full table scan (see bug 19336)
+ MatchFun(BindingKey, RoutingKey)]),
+ lookup_qpids(mnesia:activity(async_dirty, fun() -> qlc:e(Query) end)).
+
+lookup_qpids(Queues) ->
+ Set = sets:from_list(Queues),
+ Fun = fun() ->
sets:fold(
fun(Key, Acc) -> [#amqqueue{pid = QPid}] = mnesia:read({amqqueue, Key}),
[QPid] ++ Acc end,
[], Set) end,
- {Time2, QPids} = timer:tc(mnesia, activity, [async_dirty,Fun2]),
- io:format("Time 1 -> ~p~n",[Time1]),
- io:format("Time 2 -> ~p~n",[Time2]),
- QPids.
-
+ mnesia:activity(async_dirty,Fun).
+
% Should all of the route and binding management not be refactored to it's own module
% Especially seeing as unbind will have to be implemented for 0.91 ?
delete_routes(Q = #amqqueue{name = Name}) ->