diff options
| author | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-01-15 16:13:58 +1300 |
|---|---|---|
| committer | Tony Garnock-Jones <tonyg@kcbbs.gen.nz> | 2009-01-15 16:13:58 +1300 |
| commit | beb6e44312e4dc7182c51e88dd443cfe412386d0 (patch) | |
| tree | e55d32aba42dbc71b69b89cd09bcbd617faf2fc9 | |
| parent | abe958ccd670441b92771329acf3cd383709215d (diff) | |
| download | rabbitmq-server-git-beb6e44312e4dc7182c51e88dd443cfe412386d0.tar.gz | |
Implement routing for headers exchange.
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 111 |
2 files changed, 101 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c77d..cbdc9e48fe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -273,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 03478a4d49..a4e6d2196d 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -213,15 +214,19 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% @@ -230,7 +235,7 @@ simple_publish(Mandatory, Immediate, %% current exchange types that is at most once. %% %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> +route(#exchange{name = Name, type = topic}, RoutingKey, _Content) -> Query = qlc:q([QName || #route{binding = #binding{ exchange_name = ExchangeName, @@ -254,13 +259,37 @@ route(#exchange{name = Name, type = topic}, RoutingKey) -> topic_matches(BindingKey, RoutingKey)] end); -route(X = #exchange{type = headers}, _RoutingKey) -> - exit(headers_unimplemented); +route(#exchange{name = Name, type = headers}, + _RoutingKey, + #content{properties = #'P_basic'{headers = Headers0}}) -> + Headers = case Headers0 of + undefined -> []; + _ -> sort_arguments(Headers0) + end, + Query = qlc:q([QName || + #route{binding = #binding{exchange_name = ExchangeName, + queue_name = QName, + args = Spec}} + <- mnesia:table(route), + ExchangeName == Name, + headers_match(Spec, Headers)]), + lookup_qpids( + try + mnesia:async_dirty(fun qlc:e/1, [Query]) + catch exit:{aborted, {badarg, _}} -> + %% work around OTP-7025, which was fixed in R12B-1, by + %% falling back on a less efficient method + [QName || #route{binding = #binding{queue_name = QName, args = Spec}} + <- mnesia:dirty_match_object( + #route{binding = #binding{exchange_name = Name, + _ = '_'}}), + headers_match(Spec, Headers)] + end); -route(X = #exchange{type = fanout}, _) -> +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> route_internal(X, '_'); -route(X = #exchange{type = direct}, RoutingKey) -> +route(X = #exchange{type = direct}, RoutingKey, _Content) -> route_internal(X, RoutingKey). route_internal(#exchange{name = Name}, RoutingKey) -> @@ -382,7 +411,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -434,6 +463,66 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p (value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], D, AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, D, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(P = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], AllMatch, AnyMatch, MatchKind) + when PK > DK -> + headers_match(P, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], D = [{DK, _DT, _DV} | _], _AllMatch, AnyMatch, MatchKind) + when PK < DK -> + headers_match(PRest, D, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], AllMatch, AnyMatch, MatchKind) + when PK == DK -> + if + PT == void -> + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + headers_match(PRest, DRest, AllMatch, true, MatchKind); + PT =/= DT -> + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + headers_match(PRest, DRest, false, AnyMatch, MatchKind); + PV == DV -> + headers_match(PRest, DRest, AllMatch, true, MatchKind); + true -> + headers_match(PRest, DRest, false, AnyMatch, MatchKind) + end. + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. |
