summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_channel.erl2
-rw-r--r--src/rabbit_reader.erl23
-rw-r--r--src/rabbit_vhost.erl2
4 files changed, 22 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 2795e31761..4517aade45 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -165,6 +165,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName},
fun (BQS) ->
BQS1 = BQ:delete_and_terminate(Reason, BQS),
%% don't care if the internal delete doesn't return 'ok'.
+ rabbit_event:if_enabled(State, #q.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_amqqueue:internal_delete(QName),
BQS1
end, State).
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 614e307c0b..1c63e96bdf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -371,6 +371,8 @@ terminate(Reason, State) ->
_ -> ok
end,
pg_local:leave(rabbit_channels, self()),
+ rabbit_event:if_enabled(State, #ch.stats_timer,
+ fun() -> emit_stats(State) end),
rabbit_event:notify(channel_closed, [{pid, self()}]).
code_change(_OldVsn, State, _Extra) ->
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index ae8327498e..b98818a61f 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -298,11 +298,13 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) ->
{data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf],
buf_len = BufLen + size(Data),
pending_recv = false});
- closed -> case State#v1.connection_state of
+ closed -> maybe_emit_stats(State),
+ case State#v1.connection_state of
closed -> State;
_ -> throw(connection_closed_abruptly)
end;
- {error, Reason} -> throw({inet_error, Reason});
+ {error, Reason} -> maybe_emit_stats(State),
+ throw({inet_error, Reason});
{other, Other} -> handle_other(Other, Deb, State)
end.
@@ -325,9 +327,11 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) ->
%% ordinary error case. However, since this termination is
%% initiated by our parent it is probably more important to exit
%% quickly.
+ maybe_emit_stats(State),
exit(Reason);
handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}},
- _Deb, _State) ->
+ _Deb, State) ->
+ maybe_emit_stats(State),
throw(E);
handle_other({channel_exit, Channel, Reason}, Deb, State) ->
mainloop(Deb, handle_exception(State, Channel, Reason));
@@ -342,7 +346,8 @@ handle_other(handshake_timeout, _Deb, State) ->
throw({handshake_timeout, State#v1.callback});
handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) ->
mainloop(Deb, State);
-handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) ->
+handle_other(heartbeat_timeout, _Deb, State = #v1{connection_state = S}) ->
+ maybe_emit_stats(State),
throw({heartbeat_timeout, S});
handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) ->
{ForceTermination, NewState} = terminate(Explanation, State),
@@ -376,8 +381,9 @@ handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) ->
handle_other({bump_credit, Msg}, Deb, State) ->
credit_flow:handle_bump_msg(Msg),
recvloop(Deb, control_throttle(State));
-handle_other(Other, _Deb, _State) ->
+handle_other(Other, _Deb, State) ->
%% internal error -> something worth dying for
+ maybe_emit_stats(State),
exit({unexpected_message, Other}).
switch_callback(State, Callback, Length) ->
@@ -839,8 +845,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath},
rabbit_event:notify(connection_created,
[{type, network} |
infos(?CREATION_EVENT_KEYS, State1)]),
- rabbit_event:if_enabled(State1, #v1.stats_timer,
- fun() -> emit_stats(State1) end),
+ maybe_emit_stats(State1),
State1;
handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) ->
lists:foreach(fun rabbit_channel:shutdown/1, all_channels()),
@@ -1002,6 +1007,10 @@ cert_info(F, #v1{sock = Sock}) ->
{ok, Cert} -> list_to_binary(F(Cert))
end.
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #v1.stats_timer,
+ fun() -> emit_stats(State) end).
+
emit_stats(State) ->
rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)),
rabbit_event:reset_stats_timer(State, #v1.stats_timer).
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 0bb18f4c7e..58e77efb80 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -70,6 +70,7 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
+ rabbit_event:notify(vhost_created, info(VHostPath)),
R.
delete(VHostPath) ->
@@ -87,6 +88,7 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
+ ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]),
R.
internal_delete(VHostPath) ->