summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-01-15 16:13:58 +1300
committerTony Garnock-Jones <tonyg@kcbbs.gen.nz>2009-01-15 16:13:58 +1300
commitbeb6e44312e4dc7182c51e88dd443cfe412386d0 (patch)
treee55d32aba42dbc71b69b89cd09bcbd617faf2fc9
parentabe958ccd670441b92771329acf3cd383709215d (diff)
downloadrabbitmq-server-git-beb6e44312e4dc7182c51e88dd443cfe412386d0.tar.gz
Implement routing for headers exchange.
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_exchange.erl111
2 files changed, 101 insertions, 12 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index ca2782c77d..cbdc9e48fe 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -273,7 +273,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin,
routing_key = RoutingKey,
content = DecodedContent,
persistent_key = PersistentKey},
- rabbit_exchange:route(Exchange, RoutingKey), State)};
+ rabbit_exchange:route(Exchange, RoutingKey, DecodedContent), State)};
handle_method(#'basic.ack'{delivery_tag = DeliveryTag,
multiple = Multiple},
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index 03478a4d49..a4e6d2196d 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -37,11 +37,11 @@
-export([recover/0, declare/5, lookup/1, lookup_or_die/1,
list/1, info/1, info/2, info_all/1, info_all/2,
simple_publish/6, simple_publish/3,
- route/2]).
+ route/3]).
-export([add_binding/4, delete_binding/4, list_bindings/1]).
-export([delete/2]).
-export([delete_bindings_for_queue/1]).
--export([check_type/1, assert_type/2, topic_matches/2]).
+-export([check_type/1, assert_type/2, topic_matches/2, headers_match/2]).
%% EXTENDED API
-export([list_exchange_bindings/1]).
@@ -77,7 +77,7 @@
(bool(), bool(), exchange_name(), routing_key(), binary(), binary()) ->
publish_res()).
-spec(simple_publish/3 :: (bool(), bool(), message()) -> publish_res()).
--spec(route/2 :: (exchange(), routing_key()) -> [pid()]).
+-spec(route/3 :: (exchange(), routing_key(), decoded_content()) -> [pid()]).
-spec(add_binding/4 ::
(exchange_name(), queue_name(), routing_key(), amqp_table()) ->
bind_res() | {'error', 'durability_settings_incompatible'}).
@@ -88,6 +88,7 @@
[{exchange_name(), queue_name(), routing_key(), amqp_table()}]).
-spec(delete_bindings_for_queue/1 :: (queue_name()) -> 'ok').
-spec(topic_matches/2 :: (binary(), binary()) -> bool()).
+-spec(headers_match/2 :: (amqp_table(), amqp_table()) -> bool()).
-spec(delete/2 :: (exchange_name(), bool()) ->
'ok' | not_found() | {'error', 'in_use'}).
-spec(list_queue_bindings/1 :: (queue_name()) ->
@@ -213,15 +214,19 @@ simple_publish(Mandatory, Immediate, ExchangeName, RoutingKeyBin,
%% Usable by Erlang code that wants to publish messages.
simple_publish(Mandatory, Immediate,
Message = #basic_message{exchange_name = ExchangeName,
- routing_key = RoutingKey}) ->
+ routing_key = RoutingKey,
+ content = Content}) ->
case lookup(ExchangeName) of
{ok, Exchange} ->
- QPids = route(Exchange, RoutingKey),
+ QPids = route(Exchange, RoutingKey, Content),
rabbit_router:deliver(QPids, Mandatory, Immediate,
none, Message);
{error, Error} -> {error, Error}
end.
+sort_arguments(Arguments) ->
+ lists:keysort(1, Arguments).
+
%% return the list of qpids to which a message with a given routing
%% key, sent to a particular exchange, should be delivered.
%%
@@ -230,7 +235,7 @@ simple_publish(Mandatory, Immediate,
%% current exchange types that is at most once.
%%
%% TODO: Maybe this should be handled by a cursor instead.
-route(#exchange{name = Name, type = topic}, RoutingKey) ->
+route(#exchange{name = Name, type = topic}, RoutingKey, _Content) ->
Query = qlc:q([QName ||
#route{binding = #binding{
exchange_name = ExchangeName,
@@ -254,13 +259,37 @@ route(#exchange{name = Name, type = topic}, RoutingKey) ->
topic_matches(BindingKey, RoutingKey)]
end);
-route(X = #exchange{type = headers}, _RoutingKey) ->
- exit(headers_unimplemented);
+route(#exchange{name = Name, type = headers},
+ _RoutingKey,
+ #content{properties = #'P_basic'{headers = Headers0}}) ->
+ Headers = case Headers0 of
+ undefined -> [];
+ _ -> sort_arguments(Headers0)
+ end,
+ Query = qlc:q([QName ||
+ #route{binding = #binding{exchange_name = ExchangeName,
+ queue_name = QName,
+ args = Spec}}
+ <- mnesia:table(route),
+ ExchangeName == Name,
+ headers_match(Spec, Headers)]),
+ lookup_qpids(
+ try
+ mnesia:async_dirty(fun qlc:e/1, [Query])
+ catch exit:{aborted, {badarg, _}} ->
+ %% work around OTP-7025, which was fixed in R12B-1, by
+ %% falling back on a less efficient method
+ [QName || #route{binding = #binding{queue_name = QName, args = Spec}}
+ <- mnesia:dirty_match_object(
+ #route{binding = #binding{exchange_name = Name,
+ _ = '_'}}),
+ headers_match(Spec, Headers)]
+ end);
-route(X = #exchange{type = fanout}, _) ->
+route(X = #exchange{type = fanout}, _RoutingKey, _Content) ->
route_internal(X, '_');
-route(X = #exchange{type = direct}, RoutingKey) ->
+route(X = #exchange{type = direct}, RoutingKey, _Content) ->
route_internal(X, RoutingKey).
route_internal(#exchange{name = Name}, RoutingKey) ->
@@ -382,7 +411,7 @@ sync_binding(ExchangeName, QueueName, RoutingKey, Arguments, Durable, Fun) ->
Binding = #binding{exchange_name = ExchangeName,
queue_name = QueueName,
key = RoutingKey,
- args = Arguments},
+ args = sort_arguments(Arguments)},
ok = case Durable of
true -> Fun(durable_routes, #route{binding = Binding}, write);
false -> ok
@@ -434,6 +463,66 @@ reverse_binding(#binding{exchange_name = Exchange,
key = Key,
args = Args}.
+default_headers_match_kind() -> all.
+
+parse_x_match(<<"all">>) -> all;
+parse_x_match(<<"any">>) -> any;
+parse_x_match(Other) ->
+ rabbit_log:warning("Invalid x-match field value ~p; expected all or any", [Other]),
+ default_headers_match_kind().
+
+%% Horrendous matching algorithm. Depends for its merge-like
+%% (linear-time) behaviour on the lists:keysort (sort_arguments) that
+%% route/3 and sync_binding/6 do.
+%%
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%% In other words: REQUIRES BOTH PATTERN AND DATA TO BE SORTED ASCENDING BY KEY.
+%% !!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!
+%%
+headers_match(Pattern, Data) ->
+ MatchKind = case lists:keysearch(<<"x-match">>, 1, Pattern) of
+ {value, {_, longstr, MK}} -> parse_x_match(MK);
+ {value, {_, Type, MK}} ->
+ rabbit_log:warning("Invalid x-match field type ~p (value ~p); expected longstr",
+ [Type, MK]),
+ default_headers_match_kind();
+ _ -> default_headers_match_kind()
+ end,
+ headers_match(Pattern, Data, true, false, MatchKind).
+
+headers_match([], _Data, AllMatch, _AnyMatch, all) ->
+ AllMatch;
+headers_match([], _Data, _AllMatch, AnyMatch, any) ->
+ AnyMatch;
+headers_match([{<<"x-", _/binary>>, _PT, _PV} | PRest], D, AllMatch, AnyMatch, MatchKind) ->
+ headers_match(PRest, D, AllMatch, AnyMatch, MatchKind);
+headers_match(_Pattern, [], _AllMatch, AnyMatch, MatchKind) ->
+ headers_match([], [], false, AnyMatch, MatchKind);
+headers_match(P = [{PK, _PT, _PV} | _], [{DK, _DT, _DV} | DRest], AllMatch, AnyMatch, MatchKind)
+ when PK > DK ->
+ headers_match(P, DRest, AllMatch, AnyMatch, MatchKind);
+headers_match([{PK, _PT, _PV} | PRest], D = [{DK, _DT, _DV} | _], _AllMatch, AnyMatch, MatchKind)
+ when PK < DK ->
+ headers_match(PRest, D, false, AnyMatch, MatchKind);
+headers_match([{PK, PT, PV} | PRest], [{DK, DT, DV} | DRest], AllMatch, AnyMatch, MatchKind)
+ when PK == DK ->
+ if
+ PT == void ->
+ %% It's not properly specified, but a "no value" in a
+ %% pattern field is supposed to mean simple presence of
+ %% the corresponding data field. I've interpreted that to
+ %% mean a type of "void" for the pattern field.
+ headers_match(PRest, DRest, AllMatch, true, MatchKind);
+ PT =/= DT ->
+ %% Similarly, it's not specified, but I assume that a
+ %% mismatched type causes a mismatched value.
+ headers_match(PRest, DRest, false, AnyMatch, MatchKind);
+ PV == DV ->
+ headers_match(PRest, DRest, AllMatch, true, MatchKind);
+ true ->
+ headers_match(PRest, DRest, false, AnyMatch, MatchKind)
+ end.
+
split_topic_key(Key) ->
{ok, KeySplit} = regexp:split(binary_to_list(Key), "\\."),
KeySplit.