diff options
| author | Matthias Radestock <matthias@lshift.net> | 2009-01-15 14:20:03 +0000 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2009-01-15 14:20:03 +0000 |
| commit | 7ca227c1d109aed0754a3e8ac7e63ce030a875fa (patch) | |
| tree | 426170b56fc0af55a0c5c89e6c94c18ebc074b3d | |
| parent | cef2795d474a216c6ba239b5071fe69779d2803d (diff) | |
| parent | d7e593b4f361040eca03a5337ed4bcb4e154775c (diff) | |
| download | rabbitmq-server-git-7ca227c1d109aed0754a3e8ac7e63ce030a875fa.tar.gz | |
merge bug20152 into default
| -rw-r--r-- | src/rabbit_access_control.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 135 |
3 files changed, 110 insertions, 29 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index b73090fc44..36270efddc 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -186,6 +186,8 @@ add_vhost(VHostPath) -> [{<<"">>, direct}, {<<"amq.direct">>, direct}, {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml {<<"amq.fanout">>, fanout}]], ok; [_] -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index ca2782c77d..cbdc9e48fe 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -273,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, content = DecodedContent, persistent_key = PersistentKey}, - rabbit_exchange:route(Exchange, RoutingKey), State)}; + rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)}; handle_method(#'basic.ack'{delivery_tag = DeliveryTag, multiple = Multiple}, diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 925c335cee..960e4945fe 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -37,11 +37,11 @@ -export([recover/0, declare/5, lookup/1, lookup_or_die/1, list/1, info/1, info/2, info_all/1, info_all/2, simple_publish/6, simple_publish/3, - route/2]). + route/3]). -export([add_binding/4, delete_binding/4, list_bindings/1]). -export([delete/2]). -export([delete_bindings_for_queue/1]). --export([check_type/1, assert_type/2, topic_matches/2]). +-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]). %% EXTENDED API -export([list_exchange_bindings/1]). @@ -77,7 +77,7 @@ (bool(), bool(), exchange_name(), routing_key(), binary(), binary()) -> publish_res()). -spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()). --spec(route/2 :: (exchange(), routing_key()) -> [pid()]). +-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]). -spec(add_binding/4 :: (exchange_name(), queue_name(), routing_key(), amqp_table()) -> bind_res() | {'error', 'durability_settings_incompatible'}). @@ -88,6 +88,7 @@ [{exchange_name(), queue_name(), routing_key(), amqp_table()}]). -spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok'). -spec(topic_matches/2 :: (binary(), binary()) -> bool()). +-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()). -spec(delete/2 :: (exchange_name(), bool()) -> 'ok' | not_found() | {'error', 'in_use'}). -spec(list_queue_bindings/1 :: (queue_name()) -> @@ -145,6 +146,8 @@ check_type(<<"direct">>) -> direct; check_type(<<"topic">>) -> topic; +check_type(<<"headers">>) -> + headers; check_type(T) -> rabbit_misc:protocol_error( command_invalid, "invalid exchange type '~s'", [T]). @@ -211,54 +214,69 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin, %% Usable by Erlang code that wants to publish messages. simple_publish(Mandatory, Immediate, Message = #basic_message{exchange_name = ExchangeName, - routing_key = RoutingKey}) -> + routing_key = RoutingKey, + content = Content}) -> case lookup(ExchangeName) of {ok, Exchange} -> - QPids = route(Exchange, RoutingKey), + QPids = route(Exchange, RoutingKey, Content), rabbit_router:deliver(QPids, Mandatory, Immediate, none, Message); {error, Error} -> {error, Error} end. +sort_arguments(Arguments) -> + lists:keysort(1, Arguments). + %% return the list of qpids to which a message with a given routing %% key, sent to a particular exchange, should be delivered. %% %% The function ensures that a qpid appears in the return list exactly %% as many times as a message should be delivered to it. With the %% current exchange types that is at most once. -%% +route(X = #exchange{type = topic}, RoutingKey, _Content) -> + match_bindings(X, fun (#binding{key = BindingKey}) -> + topic_matches(BindingKey, RoutingKey) + end); + +route(X = #exchange{type = headers}, _RoutingKey, Content) -> + Headers = case (Content#content.properties)#'P_basic'.headers of + undefined -> []; + H -> sort_arguments(H) + end, + match_bindings(X, fun (#binding{args = Spec}) -> + headers_match(Spec, Headers) + end); + +route(X = #exchange{type = fanout}, _RoutingKey, _Content) -> + match_routing_key(X, '_'); + +route(X = #exchange{type = direct}, RoutingKey, _Content) -> + match_routing_key(X, RoutingKey). + %% TODO: Maybe this should be handled by a cursor instead. -route(#exchange{name = Name, type = topic}, RoutingKey) -> - Query = qlc:q([QName || - #route{binding = #binding{ - exchange_name = ExchangeName, - queue_name = QName, - key = BindingKey}} <- mnesia:table(route), - ExchangeName == Name, - %% TODO: This causes a full scan for each entry - %% with the same exchange (see bug 19336) - topic_matches(BindingKey, RoutingKey)]), +%% TODO: This causes a full scan for each entry with the same exchange +match_bindings(#exchange{name = Name}, Match) -> + Query = qlc:q([QName || #route{binding = Binding = #binding{ + exchange_name = ExchangeName, + queue_name = QName}} <- + mnesia:table(route), + ExchangeName == Name, + Match(Binding)]), lookup_qpids( try mnesia:async_dirty(fun qlc:e/1, [Query]) catch exit:{aborted, {badarg, _}} -> %% work around OTP-7025, which was fixed in R12B-1, by %% falling back on a less efficient method - [QName || #route{binding = #binding{queue_name = QName, - key = BindingKey}} <- + [QName || #route{binding = Binding = #binding{ + queue_name = QName}} <- mnesia:dirty_match_object( #route{binding = #binding{exchange_name = Name, _ = '_'}}), - topic_matches(BindingKey, RoutingKey)] - end); - -route(X = #exchange{type = fanout}, _) -> - route_internal(X, '_'); - -route(X = #exchange{type = direct}, RoutingKey) -> - route_internal(X, RoutingKey). + Match(Binding)] + end). -route_internal(#exchange{name = Name}, RoutingKey) -> +match_routing_key(#exchange{name = Name}, RoutingKey) -> MatchHead = #route{binding = #binding{exchange_name = Name, queue_name = '$1', key = RoutingKey, @@ -377,7 +395,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) -> Binding = #binding{exchange_name = ExchangeName, queue_name = QueueName, key = RoutingKey, - args = Arguments}, + args = sort_arguments(Arguments)}, ok = case Durable of true -> Fun(durable_routes, #route{binding = Binding}, write); false -> ok @@ -429,6 +447,67 @@ reverse_binding(#binding{exchange_name = Exchange, key = Key, args = Args}. +default_headers_match_kind() -> all. + +parse_x_match(<<"all">>) -> all; +parse_x_match(<<"any">>) -> any; +parse_x_match(Other) -> + rabbit_log:warning("Invalid x-match field value ~p; expected all or any", + [Other]), + default_headers_match_kind(). + +%% Horrendous matching algorithm. Depends for its merge-like +%% (linear-time) behaviour on the lists:keysort (sort_arguments) that +%% route/3 and sync_binding/6 do. +%% +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY. +%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!! +%% +headers_match(Pattern, Data) -> + MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of + {value, {_, longstr, MK}} -> parse_x_match(MK); + {value, {_, Type, MK}} -> + rabbit_log:warning("Invalid x-match field type ~p " + "(value ~p); expected longstr", + [Type, MK]), + default_headers_match_kind(); + _ -> default_headers_match_kind() + end, + headers_match(Pattern, Data, true, false, MatchKind). + +headers_match([], _Data, AllMatch, _AnyMatch, all) -> + AllMatch; +headers_match([], _Data, _AllMatch, AnyMatch, any) -> + AnyMatch; +headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data, + AllMatch, AnyMatch, MatchKind) -> + headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind); +headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) -> + headers_match([], [], false, AnyMatch, MatchKind); +headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK > DK -> + headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind); +headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _], + _AllMatch, AnyMatch, MatchKind) when PK < DK -> + headers_match(PRest, Data, false, AnyMatch, MatchKind); +headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], + AllMatch, AnyMatch, MatchKind) when PK == DK -> + {AllMatch1, AnyMatch1} = + if + %% It's not properly specified, but a "no value" in a + %% pattern field is supposed to mean simple presence of + %% the corresponding data field. I've interpreted that to + %% mean a type of "void" for the pattern field. + PT == void -> {AllMatch, true}; + %% Similarly, it's not specified, but I assume that a + %% mismatched type causes a mismatched value. + PT =/= DT -> {false, AnyMatch}; + PV == DV -> {AllMatch, true}; + true -> {false, AnyMatch} + end, + headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind). + split_topic_key(Key) -> {ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."), KeySplit. |
