summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2019-09-20 13:36:07 +0100
committerdcorbacho <dparracorbacho@piotal.io>2019-09-20 13:36:07 +0100
commit6116a11cb25575a734f16074070418e805045c1d (patch)
tree02fb609f91a71578b975adbf0f7b3cc39fffe773 /src
parent28e14a0559677f026ba058df294b9fa03afc4902 (diff)
downloadrabbitmq-server-git-6116a11cb25575a734f16074070418e805045c1d.tar.gz
Event handler for consume events command
[#168224266]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_event_consumer.erl206
1 files changed, 206 insertions, 0 deletions
diff --git a/src/rabbit_event_consumer.erl b/src/rabbit_event_consumer.erl
new file mode 100644
index 0000000000..d0159000f6
--- /dev/null
+++ b/src/rabbit_event_consumer.erl
@@ -0,0 +1,206 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at https://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2019 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(rabbit_event_consumer).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([register/4]).
+-export([init/1, handle_call/2, handle_event/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {pid, ref, monitor, pattern}).
+
+%%----------------------------------------------------------------------------
+
+register(Pid, Ref, Duration, Pattern) ->
+ case gen_event:add_handler(rabbit_event, ?MODULE, [Pid, Ref, Duration, Pattern]) of
+ ok ->
+ {ok, Ref};
+ Error ->
+ Error
+ end.
+
+%%----------------------------------------------------------------------------
+
+init([Pid, Ref, Duration, Pattern]) ->
+ MRef = erlang:monitor(process, Pid),
+ case Duration of
+ infinity -> infinity;
+ _ -> erlang:send_after(Duration * 1000, self(), rabbit_event_consumer_timeout)
+ end,
+ {ok, #state{pid = Pid, ref = Ref, monitor = MRef, pattern = Pattern}}.
+
+handle_call(_Request, State) -> {ok, not_understood, State}.
+
+handle_event(#event{type = Type,
+ props = Props,
+ timestamp = TS,
+ reference = none}, #state{pid = Pid,
+ ref = Ref,
+ pattern = Pattern} = State) ->
+ case key(Type) of
+ ignore -> ok;
+ Key -> case re:run(Key, Pattern, [{capture, none}]) of
+ match ->
+ Data = [{'event', Key}] ++
+ fmt_proplist([{'timestamp_in_ms', TS} | Props]),
+ Pid ! {Ref, Data, confinue};
+ _ ->
+ ok
+ end
+ end,
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_info({'DOWN', MRef, _, _, _}, #state{monitor = MRef}) ->
+ remove_handler;
+handle_info(rabbit_event_consumer_timeout, #state{pid = Pid, ref = Ref}) ->
+ Pid ! {Ref, <<>>, finished},
+ remove_handler;
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Arg, #state{monitor = MRef}) ->
+ erlang:demonitor(MRef),
+ ok.
+
+code_change(_OldVsn, State, _Extra) -> {ok, State}.
+
+%%----------------------------------------------------------------------------
+
+%% pattern matching is way more efficient that the string operations,
+%% let's use all the keys we're aware of to speed up the handler.
+%% Any unknown or new one will be processed as before (see last function clause).
+key(queue_deleted) ->
+ <<"queue.deleted">>;
+key(queue_created) ->
+ <<"queue.created">>;
+key(exchange_created) ->
+ <<"exchange.created">>;
+key(exchange_deleted) ->
+ <<"exchange.deleted">>;
+key(binding_created) ->
+ <<"binding.created">>;
+key(connection_created) ->
+ <<"connection.created">>;
+key(connection_closed) ->
+ <<"connection.closed">>;
+key(channel_created) ->
+ <<"channel.created">>;
+key(channel_closed) ->
+ <<"channel.closed">>;
+key(consumer_created) ->
+ <<"consumer.created">>;
+key(consumer_deleted) ->
+ <<"consumer.deleted">>;
+key(queue_stats) ->
+ ignore;
+key(connection_stats) ->
+ ignore;
+key(policy_set) ->
+ <<"policy.set">>;
+key(policy_cleared) ->
+ <<"policy.cleared">>;
+key(parameter_set) ->
+ <<"parameter.set">>;
+key(parameter_cleared) ->
+ <<"parameter.cleared">>;
+key(vhost_created) ->
+ <<"vhost.created">>;
+key(vhost_deleted) ->
+ <<"vhost.deleted">>;
+key(vhost_limits_set) ->
+ <<"vhost.limits.set">>;
+key(vhost_limits_cleared) ->
+ <<"vhost.limits.cleared">>;
+key(user_authentication_success) ->
+ <<"user.authentication.success">>;
+key(user_authentication_failure) ->
+ <<"user.authentication.failure">>;
+key(user_created) ->
+ <<"user.created">>;
+key(user_deleted) ->
+ <<"user.deleted">>;
+key(user_password_changed) ->
+ <<"user.password.changed">>;
+key(user_password_cleared) ->
+ <<"user.password.cleared">>;
+key(user_tags_set) ->
+ <<"user.tags.set">>;
+key(permission_created) ->
+ <<"permission.created">>;
+key(permission_deleted) ->
+ <<"permission.deleted">>;
+key(topic_permission_created) ->
+ <<"topic.permission.created">>;
+key(topic_permission_deleted) ->
+ <<"topic.permission.deleted">>;
+key(alarm_set) ->
+ <<"alarm.set">>;
+key(alarm_cleared) ->
+ <<"alarm.cleared">>;
+key(shovel_worker_status) ->
+ <<"shovel.worker.status">>;
+key(shovel_worker_removed) ->
+ <<"shovel.worker.removed">>;
+key(federation_link_status) ->
+ <<"federation.link.status">>;
+key(federation_link_removed) ->
+ <<"federation.link.removed">>;
+key(S) ->
+ case string:tokens(atom_to_list(S), "_") of
+ [_, "stats"] -> ignore;
+ Tokens -> list_to_binary(string:join(Tokens, "."))
+ end.
+
+fmt_proplist(Props) ->
+ lists:foldl(fun({K, V}, Acc) ->
+ case fmt(K, V) of
+ L when is_list(L) -> lists:append(L, Acc);
+ T -> [T | Acc]
+ end
+ end, [], Props).
+
+fmt(K, #resource{virtual_host = VHost,
+ name = Name}) -> [{K, Name},
+ {'vhost', VHost}];
+fmt(K, true) -> {K, true};
+fmt(K, false) -> {K, false};
+fmt(K, V) when is_atom(V) -> {K, atom_to_binary(V, utf8)};
+fmt(K, V) when is_integer(V) -> {K, V};
+fmt(K, V) when is_number(V) -> {K, V};
+fmt(K, V) when is_binary(V) -> {K, V};
+fmt(K, [{_, _}|_] = Vs) -> {K, fmt_proplist(Vs)};
+fmt(K, Vs) when is_list(Vs) -> {K, [fmt(V) || V <- Vs]};
+fmt(K, V) when is_pid(V) -> {K, list_to_binary(rabbit_misc:pid_to_string(V))};
+fmt(K, V) -> {K,
+ list_to_binary(
+ rabbit_misc:format("~1000000000p", [V]))}.
+
+%% Exactly the same as fmt/2, duplicated only for performance issues
+fmt(true) -> true;
+fmt(false) -> false;
+fmt(V) when is_atom(V) -> atom_to_binary(V, utf8);
+fmt(V) when is_integer(V) -> V;
+fmt(V) when is_number(V) -> V;
+fmt(V) when is_binary(V) -> V;
+fmt([{_, _}|_] = Vs) -> fmt_proplist(Vs);
+fmt(Vs) when is_list(Vs) -> [fmt(V) || V <- Vs];
+fmt(V) when is_pid(V) -> list_to_binary(rabbit_misc:pid_to_string(V));
+fmt(V) -> list_to_binary(
+ rabbit_misc:format("~1000000000p", [V])).