diff options
| -rw-r--r-- | src/rabbit_access_control.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_exchange.erl | 43 | ||||
| -rw-r--r-- | src/rabbit_exchange_type.erl | 12 |
4 files changed, 37 insertions, 32 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index daf6f5af40..6ff7a1046c 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -242,12 +242,12 @@ add_vhost(VHostPath) -> rabbit_misc:r(VHostPath, exchange, Name), Type, true, false, []) || {Name,Type} <- - [{<<"">>, rabbit_exchange_type_direct}, - {<<"amq.direct">>, rabbit_exchange_type_direct}, - {<<"amq.topic">>, rabbit_exchange_type_topic}, - {<<"amq.match">>, rabbit_exchange_type_headers}, %% per 0-9-1 pdf - {<<"amq.headers">>, rabbit_exchange_type_headers}, %% per 0-9-1 xml - {<<"amq.fanout">>, rabbit_exchange_type_fanout}]], + [{<<"">>, direct}, + {<<"amq.direct">>, direct}, + {<<"amq.topic">>, topic}, + {<<"amq.match">>, headers}, %% per 0-9-1 pdf + {<<"amq.headers">>, headers}, %% per 0-9-1 xml + {<<"amq.fanout">>, fanout}]], ok; [_] -> mnesia:abort({vhost_already_exists, VHostPath}) diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index 9651ae12ac..b9bd71b78d 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -48,7 +48,7 @@ boot() -> init([DefaultVHost]) -> #exchange{} = rabbit_exchange:declare( rabbit_misc:r(DefaultVHost, exchange, ?LOG_EXCH_NAME), - rabbit_exchange_type_topic, true, false, []), + topic, true, false, []), {ok, #resource{virtual_host = DefaultVHost, kind = exchange, name = ?LOG_EXCH_NAME}}. diff --git a/src/rabbit_exchange.erl b/src/rabbit_exchange.erl index 05bef73af2..d004a279f8 100644 --- a/src/rabbit_exchange.erl +++ b/src/rabbit_exchange.erl @@ -123,7 +123,7 @@ recover_with_bindings([B = #binding{exchange_name = N} | Rest], when N =:= Name -> recover_with_bindings(Rest, Xs, [B | Bindings]); recover_with_bindings(Bs, [X = #exchange{ type = Type } | Xs], Bindings) -> - Type:recover(X, Bindings), + (type_to_module(Type)):recover(X, Bindings), recover_with_bindings(Bs, Xs, []); recover_with_bindings([], [], []) -> ok. @@ -137,7 +137,8 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> %% We want to upset things if it isn't ok; this is different from %% the other hooks invocations, where we tend to ignore the return %% value. - ok = Type:validate(Exchange), + TypeModule = (type_to_module(Type)), + ok = TypeModule:validate(Exchange), case rabbit_misc:execute_mnesia_transaction( fun () -> case mnesia:wread({rabbit_exchange, ExchangeName}) of @@ -154,13 +155,13 @@ declare(ExchangeName, Type, Durable, AutoDelete, Args) -> {existing, ExistingX} end end) of - {new, X} -> Type:create(X), + {new, X} -> TypeModule:create(X), X; {existing, X} -> X; Err -> Err end. -typename_to_plugin_module(T) -> +type_to_module(T) -> case rabbit_exchange_type:lookup_module(T) of {ok, Module} -> Module; {error, not_found} -> rabbit_misc:protocol_error( @@ -168,12 +169,9 @@ typename_to_plugin_module(T) -> "invalid exchange type '~s'", [T]) end. -plugin_module_to_typename(M) -> - {ok, TypeName} = rabbit_exchange_type:lookup_name(M), - TypeName. - -check_type(T) -> - Module = typename_to_plugin_module(T), +check_type(TypeBin) -> + T = rabbit_exchange_type:binary_to_type(TypeBin), + Module = type_to_module(T), case catch Module:description() of {'EXIT', {undef, [{_, description, []} | _]}} -> rabbit_misc:protocol_error( @@ -182,7 +180,7 @@ check_type(T) -> rabbit_misc:protocol_error( command_invalid, "problem loading exchange type '~s'", [T]); _ -> - Module + T end. assert_type(#exchange{ type = ActualType }, RequiredType) @@ -192,8 +190,8 @@ assert_type(#exchange{ name = Name, type = ActualType }, RequiredType) -> rabbit_misc:protocol_error( not_allowed, "cannot redeclare ~s of type '~s' with type '~s'", [rabbit_misc:rs(Name), - plugin_module_to_typename(ActualType), - plugin_module_to_typename(RequiredType)]). + ActualType, + RequiredType]). lookup(Name) -> rabbit_misc:dirty_read({rabbit_exchange, Name}). @@ -217,7 +215,7 @@ map(VHostPath, F) -> infos(Items, X) -> [{Item, i(Item, X)} || Item <- Items]. i(name, #exchange{name = Name}) -> Name; -i(type, #exchange{type = Type}) -> plugin_module_to_typename(Type); +i(type, #exchange{type = Type}) -> Type; i(durable, #exchange{durable = Durable}) -> Durable; i(auto_delete, #exchange{auto_delete = AutoDelete}) -> AutoDelete; i(arguments, #exchange{arguments = Arguments}) -> Arguments; @@ -235,7 +233,7 @@ publish(X, Delivery) -> publish(X, [], Delivery). publish(X = #exchange{type = Type}, Seen, Delivery) -> - case Type:publish(X, Delivery) of + case (type_to_module(Type)):publish(X, Delivery) of {_, []} = R -> #exchange{name = XName, arguments = Args} = X, case rabbit_misc:r_arg(XName, exchange, Args, @@ -306,9 +304,9 @@ delete_queue_bindings(QueueName, FwdDeleteFun) -> none, [], []), fun () -> lists:foreach(fun ({{deleted, X = #exchange{ type = Type}}, Bs}) -> - Type:delete(X, Bs); + (type_to_module(Type)):delete(X, Bs); ({{_, X = #exchange{ type = Type }}, Bs}) -> - [Type:delete_binding(X, B) || B <- Bs] + [(type_to_module(Type)):delete_binding(X, B) || B <- Bs] end, BindingsWithExchanges) end. @@ -392,7 +390,7 @@ add_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> end end) of {new, Exchange = #exchange{ type = Type }, Binding} -> - Type:add_binding(Exchange, Binding); + (type_to_module(Type)):add_binding(Exchange, Binding); {existing, _, _} -> ok; Err = {error, _} -> @@ -412,11 +410,12 @@ delete_binding(ExchangeName, QueueName, RoutingKey, Arguments) -> end end) of {{deleted, X = #exchange{ type = Type }}, B} -> - Type:delete_binding(X, B), - Type:delete(X), + Module = (type_to_module(Type)), + Module:delete_binding(X, B), + Module:delete(X), ok; {{no_delete, X = #exchange{ type = Type }}, B} -> - Type:delete_binding(X, B), + (type_to_module(Type)):delete_binding(X, B), ok; Err -> Err @@ -495,7 +494,7 @@ delete(ExchangeName, IfUnused) -> end, case call_with_exchange(ExchangeName, Fun) of {deleted, X = #exchange{ type = Type }, Bs} -> - Type:delete(X, Bs), + (type_to_module(Type)):delete(X, Bs), ok; Err -> Err diff --git a/src/rabbit_exchange_type.erl b/src/rabbit_exchange_type.erl index 58dcfbb6a2..9d5fb20229 100644 --- a/src/rabbit_exchange_type.erl +++ b/src/rabbit_exchange_type.erl @@ -38,10 +38,12 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). --export([register/2, lookup_module/1, lookup_name/1]). +-export([register/2, binary_to_type/1, lookup_module/1, lookup_name/1]). -define(SERVER, ?MODULE). +%% TODO specs + %%--------------------------------------------------------------------------- start_link() -> @@ -52,7 +54,10 @@ start_link() -> register(TypeName, ModuleName) -> gen_server:call(?SERVER, {register, TypeName, ModuleName}). -lookup_module(T) when is_binary(T) -> +binary_to_type(TypeBin) when is_binary(TypeBin) -> + list_to_atom(binary_to_list(TypeBin)). + +lookup_module(T) when is_atom(T) -> case ets:lookup(rabbit_exchange_type_modules, T) of [{_, Module}] -> {ok, Module}; @@ -68,7 +73,8 @@ lookup_name(M) when is_atom(M) -> internal_register(TypeName, ModuleName) when is_binary(TypeName), is_atom(ModuleName) -> - true = ets:insert(rabbit_exchange_type_modules, {TypeName, ModuleName}), + true = ets:insert(rabbit_exchange_type_modules, + {binary_to_type(TypeName), ModuleName}), true = ets:insert(rabbit_exchange_type_names, {ModuleName, TypeName}), ok. |
