summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_access_control.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_exchange.erl135
3 files changed, 110 insertions, 29 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index b73090fc44..36270efddc 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -186,6 +186,8 @@ add_vhost(VHostPath) ->
[{<<"">>, direct},
{<<"amq.direct">>, direct},
{<<"amq.topic">>, topic},
+ {<<"amq.match">>, headers}, %% per 0-9-1 pdf
+ {<<"amq.headers">>, headers}, %% per 0-9-1 xml
{<<"amq.fanout">>, fanout}]],
ok;
[_] ->
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ca2782c77d..cbdc9e48fe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -273,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 925c335cee..960e4945fe 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,11 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_bindings_for_queue/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -77,7 +77,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -88,6 +88,7 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -145,6 +146,8 @@ check_type(<<"direct">>) ->
direct;
check_type(<<"topic">>) ->
topic;
+check_type(<<"headers">>) ->
+ headers;
check_type(T) ->
rabbit_misc:protocol_error(
command_invalid, "invalid exchange type '~s'", [T]).
@@ -211,54 +214,69 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
%% The function ensures that a qpid appears in the return list exactly
%% as many times as a message should be delivered to it. With the
%% current exchange types that is at most once.
-%%
+route(X = #exchange{type = topic}, RoutingKey, _Content) ->
+ match_bindings(X, fun (#binding{key = BindingKey}) ->
+ topic_matches(BindingKey, RoutingKey)
+ end);
+
+route(X = #exchange{type = headers}, _RoutingKey, Content) ->
+ Headers = case (Content#content.properties)#'P_basic'.headers of
+ undefined -> [];
+ H -> sort_arguments(H)
+ end,
+ match_bindings(X, fun (#binding{args = Spec}) ->
+ headers_match(Spec, Headers)
+ end);
+
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
+ match_routing_key(X, '_');
+
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
+ match_routing_key(X, RoutingKey).
+
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
- Query = qlc:q([QName ||
- #route{binding = #binding{
- exchange_name = ExchangeName,
- queue_name = QName,
- key = BindingKey}} <- mnesia:table(route),
- ExchangeName == Name,
- %% TODO: This causes a full scan for each entry
- %% with the same exchange (see bug 19336)
- topic_matches(BindingKey, RoutingKey)]),
+%% TODO: This causes a full scan for each entry with the same exchange
+match_bindings(#exchange{name = Name}, Match) ->
+ Query = qlc:q([QName || #route{binding = Binding = #binding{
+ exchange_name = ExchangeName,
+ queue_name = QName}} <-
+ mnesia:table(route),
+ ExchangeName == Name,
+ Match(Binding)]),
lookup_qpids(
try
mnesia:async_dirty(fun qlc:e/1, [Query])
catch exit:{aborted, {badarg, _}} ->
%% work around OTP-7025, which was fixed in R12B-1, by
%% falling back on a less efficient method
- [QName || #route{binding = #binding{queue_name = QName,
- key = BindingKey}} <-
+ [QName || #route{binding = Binding = #binding{
+ queue_name = QName}} <-
mnesia:dirty_match_object(
#route{binding = #binding{exchange_name = Name,
_ = '_'}}),
- topic_matches(BindingKey, RoutingKey)]
- end);
-
-route(X = #exchange{type = fanout}, _) ->
- route_internal(X, '_');
-
-route(X = #exchange{type = direct}, RoutingKey) ->
- route_internal(X, RoutingKey).
+ Match(Binding)]
+ end).
-route_internal(#exchange{name = Name}, RoutingKey) ->
+match_routing_key(#exchange{name = Name}, RoutingKey) ->
MatchHead = #route{binding = #binding{exchange_name = Name,
queue_name = '$1',
key = RoutingKey,
@@ -377,7 +395,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
@@ -429,6 +447,67 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any",
+ [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p "
+ "(value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], Data,
+ AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, Data, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(Pattern = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK > DK ->
+ headers_match(Pattern, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], Data = [{DK, _DT, _DV} | _],
+ _AllMatch, AnyMatch, MatchKind) when PK < DK ->
+ headers_match(PRest, Data, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest],
+ AllMatch, AnyMatch, MatchKind) when PK == DK ->
+ {AllMatch1, AnyMatch1} =
+ if
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ PT == void -> {AllMatch, true};
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ PT =/= DT -> {false, AnyMatch};
+ PV == DV -> {AllMatch, true};
+ true -> {false, AnyMatch}
+ end,
+ headers_match(PRest, DRest, AllMatch1, AnyMatch1, MatchKind).
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.