diff options
| author | Emile Joubert <emile@rabbitmq.com> | 2011-01-19 12:49:30 +0000 |
|---|---|---|
| committer | Emile Joubert <emile@rabbitmq.com> | 2011-01-19 12:49:30 +0000 |
| commit | ecf16c7f9b81b6d47f6441fa1d2a05c91b6cbfa4 (patch) | |
| tree | 9b994baf1dd1004c5cdd892b4134145ab0857ae0 | |
| parent | 11296b5fdbec856257fa9038da940ace8945f9ae (diff) | |
| download | rabbitmq-server-git-ecf16c7f9b81b6d47f6441fa1d2a05c91b6cbfa4.tar.gz | |
Sender-specified distribution for fanout exchanges
| -rw-r--r-- | codegen.py | 2 | ||||
| -rw-r--r-- | include/rabbit.hrl | 3 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 26 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_direct.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_exchange_type_fanout.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_misc.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_router.erl | 5 |
7 files changed, 44 insertions, 24 deletions
diff --git a/codegen.py b/codegen.py index 979c5bd82d..6e9139b842 100644 --- a/codegen.py +++ b/codegen.py @@ -354,7 +354,7 @@ def genErl(spec): -type(amqp_field_type() :: 'longstr' | 'signedint' | 'decimal' | 'timestamp' | 'table' | 'byte' | 'double' | 'float' | 'long' | - 'short' | 'bool' | 'binary' | 'void'). + 'short' | 'bool' | 'binary' | 'void' | 'array'). -type(amqp_property_type() :: 'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' | 'longlongint' | 'timestamp' | 'bit' | 'table'). diff --git a/include/rabbit.hrl b/include/rabbit.hrl index 81c3996bcb..5c5fad761f 100644 --- a/include/rabbit.hrl +++ b/include/rabbit.hrl @@ -96,6 +96,9 @@ -define(DESIRED_HIBERNATE, 10000). -define(STATS_INTERVAL, 5000). +-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]). +-define(DELETED_HEADER, <<"BCC">>). + -ifdef(debug). -define(LOGDEBUG0(F), rabbit_log:debug(F)). -define(LOGDEBUG(F,A), rabbit_log:debug(F,A)). diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index a95cf0b199..d9e3431da7 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -35,6 +35,7 @@ -export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]). +-export([header_routes/2]). %% this must be run inside a mnesia tx -export([maybe_auto_delete/1]). -export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]). @@ -86,7 +87,8 @@ -spec(maybe_auto_delete/1:: (rabbit_types:exchange()) -> 'not_deleted' | {'deleted', rabbit_binding:deletions()}). - +-spec(header_routes/2 :: (rabbit_framing:amqp_table(), rabbit_types:vhost()) -> + [rabbit_types:r('queue')]). -endif. %%---------------------------------------------------------------------------- @@ -319,3 +321,25 @@ unconditional_delete(X = #exchange{name = XName}) -> ok = mnesia:delete({rabbit_exchange, XName}), Bindings = rabbit_binding:remove_for_source(XName), {deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}. + +header_routes(undefined, _VHost) -> + []; +header_routes(Headers, VHost) -> + [rabbit_misc:r(VHost, queue, RKey) || + RKey <- lists:flatten([routing_keys(Headers, Header) || + Header <- ?ROUTING_HEADERS])]. + +routing_keys(HeadersTable, Key) -> + case rabbit_misc:table_lookup(HeadersTable, Key) of + {longstr, Route} -> [Route]; + {array, Routes} -> rkeys(Routes, []); + _ -> [] + end. + +rkeys([{longstr, BinVal} | Rest], RKeys) -> + rkeys(Rest, [BinVal | RKeys]); +rkeys([{_, _} | Rest], RKeys) -> + rkeys(Rest, RKeys); +rkeys(_, RKeys) -> + RKeys. + diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl index ab6888534d..9547117c4e 100644 --- a/src/rabbit_exchange_type_direct.erl +++ b/src/rabbit_exchange_type_direct.erl @@ -55,14 +55,9 @@ route(#exchange{name = #resource{virtual_host = VHost} = Name}, #delivery{message = #basic_message{routing_key = RoutingKey, content = Content}}) -> BindingRoutes = rabbit_router:match_routing_key(Name, RoutingKey), - HeaderRKeys = - case (Content#content.properties)#'P_basic'.headers of - undefined -> []; - Headers -> rabbit_misc:table_lookup(Headers, <<"CC">>, <<0>>) ++ - rabbit_misc:table_lookup(Headers, <<"BCC">>, <<0>>) - end, - HeaderRoutes = [rabbit_misc:r(VHost, queue, RKey) || RKey <- HeaderRKeys], - lists:usort(BindingRoutes ++ HeaderRoutes). + HeaderRoutes = rabbit_exchange:header_routes( + (Content#content.properties)#'P_basic'.headers, VHost), + BindingRoutes ++ HeaderRoutes. validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl index e7f754644c..e9faf0a257 100644 --- a/src/rabbit_exchange_type_fanout.erl +++ b/src/rabbit_exchange_type_fanout.erl @@ -31,6 +31,7 @@ -module(rabbit_exchange_type_fanout). -include("rabbit.hrl"). +-include("rabbit_framing.hrl"). -behaviour(rabbit_exchange_type). @@ -50,8 +51,13 @@ description() -> [{name, <<"fanout">>}, {description, <<"AMQP fanout exchange, as per the AMQP specification">>}]. -route(#exchange{name = Name}, _Delivery) -> - rabbit_router:match_routing_key(Name, '_'). +route(#exchange{name = #resource{virtual_host = VHost} = Name}, + #delivery{message = #basic_message{content = Content}}) -> + BindingRoutes = rabbit_router:match_routing_key(Name, '_'), + HeaderRoutes = rabbit_exchange:header_routes( + (Content#content.properties)#'P_basic'.headers, VHost), + BindingRoutes ++ HeaderRoutes. + validate(_X) -> ok. create(_X) -> ok. diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl index 604346edec..15ba787ad2 100644 --- a/src/rabbit_misc.erl +++ b/src/rabbit_misc.erl @@ -40,7 +40,7 @@ protocol_error/3, protocol_error/4, protocol_error/1]). -export([not_found/1, assert_args_equivalence/4]). -export([dirty_read/1]). --export([table_lookup/3, table_lookup/2]). +-export([table_lookup/2]). -export([r/3, r/2, r_arg/4, rs/1]). -export([enable_cover/0, report_cover/0]). -export([enable_cover/1, report_cover/1]). @@ -112,8 +112,6 @@ 'ok' | rabbit_types:connection_exit()). -spec(dirty_read/1 :: ({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')). --spec(table_lookup/3 :: - (rabbit_framing:amqp_table(), binary(), binary()) -> [binary()]). -spec(table_lookup/2 :: (rabbit_framing:amqp_table(), binary()) -> 'undefined' | {rabbit_framing:amqp_field_type(), any()}). @@ -255,13 +253,6 @@ dirty_read(ReadSpec) -> [] -> {error, not_found} end. -table_lookup(Table, Key, Separator) -> - case table_lookup(Table, Key) of - undefined -> []; - {longstr, BinVal} -> binary:split(BinVal, Separator, [global]); - _ -> [] - end. - table_lookup(Table, Key) -> case lists:keysearch(Key, 1, Table) of {value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin}; diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl index 2f556df741..7f9b823eef 100644 --- a/src/rabbit_router.erl +++ b/src/rabbit_router.erl @@ -69,7 +69,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, %% is preserved. This scales much better than the non-immediate %% case below. QPids = lookup_qpids(QNames), - ModifiedDelivery = strip_header(Delivery, <<"BCC">>), + ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER), delegate:invoke_no_result( QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end), {routed, QPids}; @@ -77,7 +77,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false, deliver(QNames, Delivery = #delivery{mandatory = Mandatory, immediate = Immediate}) -> QPids = lookup_qpids(QNames), - ModifiedDelivery = strip_header(Delivery, <<"BCC">>), + ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER), {Success, _} = delegate:invoke(QPids, fun (Pid) -> @@ -87,6 +87,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory, lists:foldl(fun fold_deliveries/2, {false, []}, Success), check_delivery(Mandatory, Immediate, {Routed, Handled}). +%% This breaks the spec rule forbidding message modification strip_header(Delivery = #delivery{message = Message = #basic_message{ content = Content = #content{ properties = Props = #'P_basic'{headers = Headers}}}}, |
