diff options
| -rw-r--r-- | src/rabbit.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_hooks.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 58 |
3 files changed, 132 insertions, 0 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl index f1dcd51f0d..88c60eb99a 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -133,6 +133,7 @@ start(normal, []) -> {"core processes", fun () -> ok = start_child(rabbit_log), + ok = rabbit_hooks:start(), ok = rabbit_amqqueue:start(), diff --git a/src/rabbit_hooks.erl b/src/rabbit_hooks.erl new file mode 100644 index 0000000000..b3d271c28d --- /dev/null +++ b/src/rabbit_hooks.erl @@ -0,0 +1,73 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developers of the Original Code are LShift Ltd, +%% Cohesive Financial Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created before 22-Nov-2008 00:00:00 GMT by LShift Ltd, +%% Cohesive Financial Technologies LLC, or Rabbit Technologies Ltd +%% are Copyright (C) 2007-2008 LShift Ltd, Cohesive Financial +%% Technologies LLC, and Rabbit Technologies Ltd. +%% +%% Portions created by LShift Ltd are Copyright (C) 2007-2009 LShift +%% Ltd. Portions created by Cohesive Financial Technologies LLC are +%% Copyright (C) 2007-2009 Cohesive Financial Technologies +%% LLC. Portions created by Rabbit Technologies Ltd are Copyright +%% (C) 2007-2009 Rabbit Technologies Ltd. +%% +%% All Rights Reserved. +%% +%% Contributor(s): ______________________________________. +%% + +-module(rabbit_hooks). + +-export([start/0]). +-export([subscribe/3, unsubscribe/2, trigger/2, notify_remote/5]). + +-define(TableName, rabbit_hooks). + +-ifdef(use_specs). + +-spec(start/0 :: () -> 'ok'). +-spec(subscribe/3 :: (atom(), atom(), {atom(), atom(), list()}) -> 'ok'). +-spec(unsubscribe/2 :: (atom(), atom()) -> 'ok'). +-spec(trigger/2 :: (atom(), list()) -> 'ok'). +-spec(notify_remote/5 :: (atom(), atom(), list(), pid(), list()) -> 'ok'). + +-endif. + +start() -> + ets:new(?TableName, [bag, public, named_table]), + ok. + +subscribe(Hook, HandlerName, Handler) -> + ets:insert(?TableName, {Hook, HandlerName, Handler}), + ok. + +unsubscribe(Hook, HandlerName) -> + ets:match_delete(?TableName, {Hook, HandlerName, '_'}), + ok. + +trigger(Hook, Args) -> + Hooks = ets:lookup(?TableName, Hook), + [case catch apply(M, F, [Hook, Name, Args | A]) of + {'EXIT', Reason} -> + rabbit_log:warning("Failed to execute handler ~p for hook ~p: ~p", + [Name, Hook, Reason]); + _ -> ok + end || {_, Name, {M, F, A}} <- Hooks], + ok. + +notify_remote(Hook, HandlerName, Args, Pid, PidArgs) -> + Pid ! {rabbitmq_hook, [Hook, HandlerName, Args | PidArgs]}, + ok. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 10a9873adc..6e3a92d037 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -35,6 +35,9 @@ -export([all_tests/0, test_parsing/0, test_disk_queue/0]). +%% Exported so the hook mechanism can call back +-export([handle_hook/3, bad_handle_hook/3, extra_arg_hook/5]). + -import(lists). -include("rabbit.hrl"). @@ -57,6 +60,7 @@ all_tests() -> passed = test_cluster_management(), passed = test_user_management(), passed = test_server_status(), + passed = test_hooks(), passed. test_priority_queue() -> @@ -655,6 +659,52 @@ test_server_status() -> passed. +test_hooks() -> + %% Firing of hooks calls all hooks in an isolated manner + rabbit_hooks:subscribe(test_hook, test, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:subscribe(test_hook2, test2, {rabbit_tests, handle_hook, []}), + rabbit_hooks:trigger(test_hook, [arg1, arg2]), + [arg1, arg2] = get(test_hook_test_fired), + [arg1, arg2] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Hook Deletion works + put(test_hook_test_fired, undefined), + put(test_hook_test2_fired, undefined), + rabbit_hooks:unsubscribe(test_hook, test), + rabbit_hooks:trigger(test_hook, [arg3, arg4]), + undefined = get(test_hook_test_fired), + [arg3, arg4] = get(test_hook_test2_fired), + undefined = get(test_hook2_test2_fired), + + %% Catches exceptions from bad hooks + rabbit_hooks:subscribe(test_hook3, test, {rabbit_tests, bad_handle_hook, []}), + ok = rabbit_hooks:trigger(test_hook3, []), + + %% Passing extra arguments to hooks + rabbit_hooks:subscribe(arg_hook, test, {rabbit_tests, extra_arg_hook, [1, 3]}), + rabbit_hooks:trigger(arg_hook, [arg1, arg2]), + {[arg1, arg2], 1, 3} = get(arg_hook_test_fired), + + %% Invoking Pids + Remote = fun() -> + receive + {rabbitmq_hook,[remote_test,test,[],Target]} -> + Target ! invoked + end + end, + P = spawn(Remote), + rabbit_hooks:subscribe(remote_test, test, {rabbit_hooks, notify_remote, [P, [self()]]}), + rabbit_hooks:trigger(remote_test, []), + receive + invoked -> ok + after 100 -> + io:format("Remote hook not invoked"), + throw(timeout) + end, + passed. + %--------------------------------------------------------------------- control_action(Command, Args) -> control_action(Command, node(), Args). @@ -739,6 +789,14 @@ delete_log_handlers(Handlers) -> Handler <- Handlers], ok. +handle_hook(HookName, Handler, Args) -> + A = atom_to_list(HookName) ++ "_" ++ atom_to_list(Handler) ++ "_fired", + put(list_to_atom(A), Args). +bad_handle_hook(_, _, _) -> + bad:bad(). +extra_arg_hook(Hookname, Handler, Args, Extra1, Extra2) -> + handle_hook(Hookname, Handler, {Args, Extra1, Extra2}). + test_disk_queue() -> rdq_stop(), rdq_virgin(), |
