summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEmile Joubert <emile@rabbitmq.com>2011-01-19 12:49:30 +0000
committerEmile Joubert <emile@rabbitmq.com>2011-01-19 12:49:30 +0000
commitecf16c7f9b81b6d47f6441fa1d2a05c91b6cbfa4 (patch)
tree9b994baf1dd1004c5cdd892b4134145ab0857ae0
parent11296b5fdbec856257fa9038da940ace8945f9ae (diff)
downloadrabbitmq-server-git-ecf16c7f9b81b6d47f6441fa1d2a05c91b6cbfa4.tar.gz
Sender-specified distribution for fanout exchanges
-rw-r--r--codegen.py2
-rw-r--r--include/rabbit.hrl3
-rw-r--r--src/rabbit_exchange.erl26
-rw-r--r--src/rabbit_exchange_type_direct.erl11
-rw-r--r--src/rabbit_exchange_type_fanout.erl10
-rw-r--r--src/rabbit_misc.erl11
-rw-r--r--src/rabbit_router.erl5
7 files changed, 44 insertions, 24 deletions
diff --git a/codegen.py b/codegen.py
index 979c5bd82d..6e9139b842 100644
--- a/codegen.py
+++ b/codegen.py
@@ -354,7 +354,7 @@ def genErl(spec):
-type(amqp_field_type() ::
'longstr' | 'signedint' | 'decimal' | 'timestamp' |
'table' | 'byte' | 'double' | 'float' | 'long' |
- 'short' | 'bool' | 'binary' | 'void').
+ 'short' | 'bool' | 'binary' | 'void' | 'array').
-type(amqp_property_type() ::
'shortstr' | 'longstr' | 'octet' | 'shortint' | 'longint' |
'longlongint' | 'timestamp' | 'bit' | 'table').
diff --git a/include/rabbit.hrl b/include/rabbit.hrl
index 81c3996bcb..5c5fad761f 100644
--- a/include/rabbit.hrl
+++ b/include/rabbit.hrl
@@ -96,6 +96,9 @@
-define(DESIRED_HIBERNATE, 10000).
-define(STATS_INTERVAL, 5000).
+-define(ROUTING_HEADERS, [<<"CC">>, <<"BCC">>]).
+-define(DELETED_HEADER, <<"BCC">>).
+
-ifdef(debug).
-define(LOGDEBUG0(F), rabbit_log:debug(F)).
-define(LOGDEBUG(F,A), rabbit_log:debug(F,A)).
diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl
index a95cf0b199..d9e3431da7 100644
--- a/src/rabbit_exchange.erl
+++ b/src/rabbit_exchange.erl
@@ -35,6 +35,7 @@
-export([recover/0, declare/6, lookup/1, lookup_or_die/1, list/1, info_keys/0,
info/1, info/2, info_all/1, info_all/2, publish/2, delete/2]).
+-export([header_routes/2]).
%% this must be run inside a mnesia tx
-export([maybe_auto_delete/1]).
-export([assert_equivalence/6, assert_args_equivalence/2, check_type/1]).
@@ -86,7 +87,8 @@
-spec(maybe_auto_delete/1::
(rabbit_types:exchange())
-> 'not_deleted' | {'deleted', rabbit_binding:deletions()}).
-
+-spec(header_routes/2 :: (rabbit_framing:amqp_table(), rabbit_types:vhost()) ->
+ [rabbit_types:r('queue')]).
-endif.
%%----------------------------------------------------------------------------
@@ -319,3 +321,25 @@ unconditional_delete(X = #exchange{name = XName}) ->
ok = mnesia:delete({rabbit_exchange, XName}),
Bindings = rabbit_binding:remove_for_source(XName),
{deleted, X, Bindings, rabbit_binding:remove_for_destination(XName)}.
+
+header_routes(undefined, _VHost) ->
+ [];
+header_routes(Headers, VHost) ->
+ [rabbit_misc:r(VHost, queue, RKey) ||
+ RKey <- lists:flatten([routing_keys(Headers, Header) ||
+ Header <- ?ROUTING_HEADERS])].
+
+routing_keys(HeadersTable, Key) ->
+ case rabbit_misc:table_lookup(HeadersTable, Key) of
+ {longstr, Route} -> [Route];
+ {array, Routes} -> rkeys(Routes, []);
+ _ -> []
+ end.
+
+rkeys([{longstr, BinVal} | Rest], RKeys) ->
+ rkeys(Rest, [BinVal | RKeys]);
+rkeys([{_, _} | Rest], RKeys) ->
+ rkeys(Rest, RKeys);
+rkeys(_, RKeys) ->
+ RKeys.
+
diff --git a/src/rabbit_exchange_type_direct.erl b/src/rabbit_exchange_type_direct.erl
index ab6888534d..9547117c4e 100644
--- a/src/rabbit_exchange_type_direct.erl
+++ b/src/rabbit_exchange_type_direct.erl
@@ -55,14 +55,9 @@ route(#exchange{name = #resource{virtual_host = VHost} = Name},
#delivery{message = #basic_message{routing_key = RoutingKey,
content = Content}}) ->
BindingRoutes = rabbit_router:match_routing_key(Name, RoutingKey),
- HeaderRKeys =
- case (Content#content.properties)#'P_basic'.headers of
- undefined -> [];
- Headers -> rabbit_misc:table_lookup(Headers, <<"CC">>, <<0>>) ++
- rabbit_misc:table_lookup(Headers, <<"BCC">>, <<0>>)
- end,
- HeaderRoutes = [rabbit_misc:r(VHost, queue, RKey) || RKey <- HeaderRKeys],
- lists:usort(BindingRoutes ++ HeaderRoutes).
+ HeaderRoutes = rabbit_exchange:header_routes(
+ (Content#content.properties)#'P_basic'.headers, VHost),
+ BindingRoutes ++ HeaderRoutes.
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_exchange_type_fanout.erl b/src/rabbit_exchange_type_fanout.erl
index e7f754644c..e9faf0a257 100644
--- a/src/rabbit_exchange_type_fanout.erl
+++ b/src/rabbit_exchange_type_fanout.erl
@@ -31,6 +31,7 @@
-module(rabbit_exchange_type_fanout).
-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
-behaviour(rabbit_exchange_type).
@@ -50,8 +51,13 @@ description() ->
[{name, <<"fanout">>},
{description, <<"AMQP fanout exchange, as per the AMQP specification">>}].
-route(#exchange{name = Name}, _Delivery) ->
- rabbit_router:match_routing_key(Name, '_').
+route(#exchange{name = #resource{virtual_host = VHost} = Name},
+ #delivery{message = #basic_message{content = Content}}) ->
+ BindingRoutes = rabbit_router:match_routing_key(Name, '_'),
+ HeaderRoutes = rabbit_exchange:header_routes(
+ (Content#content.properties)#'P_basic'.headers, VHost),
+ BindingRoutes ++ HeaderRoutes.
+
validate(_X) -> ok.
create(_X) -> ok.
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 604346edec..15ba787ad2 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -40,7 +40,7 @@
protocol_error/3, protocol_error/4, protocol_error/1]).
-export([not_found/1, assert_args_equivalence/4]).
-export([dirty_read/1]).
--export([table_lookup/3, table_lookup/2]).
+-export([table_lookup/2]).
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
@@ -112,8 +112,6 @@
'ok' | rabbit_types:connection_exit()).
-spec(dirty_read/1 ::
({atom(), any()}) -> rabbit_types:ok_or_error2(any(), 'not_found')).
--spec(table_lookup/3 ::
- (rabbit_framing:amqp_table(), binary(), binary()) -> [binary()]).
-spec(table_lookup/2 ::
(rabbit_framing:amqp_table(), binary())
-> 'undefined' | {rabbit_framing:amqp_field_type(), any()}).
@@ -255,13 +253,6 @@ dirty_read(ReadSpec) ->
[] -> {error, not_found}
end.
-table_lookup(Table, Key, Separator) ->
- case table_lookup(Table, Key) of
- undefined -> [];
- {longstr, BinVal} -> binary:split(BinVal, Separator, [global]);
- _ -> []
- end.
-
table_lookup(Table, Key) ->
case lists:keysearch(Key, 1, Table) of
{value, {_, TypeBin, ValueBin}} -> {TypeBin, ValueBin};
diff --git a/src/rabbit_router.erl b/src/rabbit_router.erl
index 2f556df741..7f9b823eef 100644
--- a/src/rabbit_router.erl
+++ b/src/rabbit_router.erl
@@ -69,7 +69,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
%% is preserved. This scales much better than the non-immediate
%% case below.
QPids = lookup_qpids(QNames),
- ModifiedDelivery = strip_header(Delivery, <<"BCC">>),
+ ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER),
delegate:invoke_no_result(
QPids, fun (Pid) -> rabbit_amqqueue:deliver(Pid, ModifiedDelivery) end),
{routed, QPids};
@@ -77,7 +77,7 @@ deliver(QNames, Delivery = #delivery{mandatory = false,
deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
immediate = Immediate}) ->
QPids = lookup_qpids(QNames),
- ModifiedDelivery = strip_header(Delivery, <<"BCC">>),
+ ModifiedDelivery = strip_header(Delivery, ?DELETED_HEADER),
{Success, _} =
delegate:invoke(QPids,
fun (Pid) ->
@@ -87,6 +87,7 @@ deliver(QNames, Delivery = #delivery{mandatory = Mandatory,
lists:foldl(fun fold_deliveries/2, {false, []}, Success),
check_delivery(Mandatory, Immediate, {Routed, Handled}).
+%% This breaks the spec rule forbidding message modification
strip_header(Delivery = #delivery{message = Message = #basic_message{
content = Content = #content{
properties = Props = #'P_basic'{headers = Headers}}}},