diff options
| author | Simon MacMullen <simon@rabbitmq.com> | 2013-01-23 16:55:19 +0000 |
|---|---|---|
| committer | Simon MacMullen <simon@rabbitmq.com> | 2013-01-23 16:55:19 +0000 |
| commit | b2bd9d180c237636e8b1894eaf7d190872fbf4f2 (patch) | |
| tree | 5681c7cbad1d1045a4ac6d80ba8a6aa572a9406b /src | |
| parent | 7d255f537200ecd16a7a706f5f773b6299606827 (diff) | |
| parent | 7cb7742188164cc0ce0927ee5d4983e5345e58a9 (diff) | |
| download | rabbitmq-server-git-b2bd9d180c237636e8b1894eaf7d190872fbf4f2.tar.gz | |
Merge default
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 23 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 2 |
4 files changed, 22 insertions, 7 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 2795e31761..4517aade45 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -165,6 +165,8 @@ terminate(Reason, State = #q{q = #amqqueue{name = QName}, fun (BQS) -> BQS1 = BQ:delete_and_terminate(Reason, BQS), %% don't care if the internal delete doesn't return 'ok'. + rabbit_event:if_enabled(State, #q.stats_timer, + fun() -> emit_stats(State) end), rabbit_amqqueue:internal_delete(QName), BQS1 end, State). diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 614e307c0b..1c63e96bdf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -371,6 +371,8 @@ terminate(Reason, State) -> _ -> ok end, pg_local:leave(rabbit_channels, self()), + rabbit_event:if_enabled(State, #ch.stats_timer, + fun() -> emit_stats(State) end), rabbit_event:notify(channel_closed, [{pid, self()}]). code_change(_OldVsn, State, _Extra) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index ae8327498e..b98818a61f 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -298,11 +298,13 @@ mainloop(Deb, State = #v1{sock = Sock, buf = Buf, buf_len = BufLen}) -> {data, Data} -> recvloop(Deb, State#v1{buf = [Data | Buf], buf_len = BufLen + size(Data), pending_recv = false}); - closed -> case State#v1.connection_state of + closed -> maybe_emit_stats(State), + case State#v1.connection_state of closed -> State; _ -> throw(connection_closed_abruptly) end; - {error, Reason} -> throw({inet_error, Reason}); + {error, Reason} -> maybe_emit_stats(State), + throw({inet_error, Reason}); {other, Other} -> handle_other(Other, Deb, State) end. @@ -325,9 +327,11 @@ handle_other({'EXIT', Parent, Reason}, _Deb, State = #v1{parent = Parent}) -> %% ordinary error case. However, since this termination is %% initiated by our parent it is probably more important to exit %% quickly. + maybe_emit_stats(State), exit(Reason); handle_other({channel_exit, _Channel, E = {writer, send_failed, _Error}}, - _Deb, _State) -> + _Deb, State) -> + maybe_emit_stats(State), throw(E); handle_other({channel_exit, Channel, Reason}, Deb, State) -> mainloop(Deb, handle_exception(State, Channel, Reason)); @@ -342,7 +346,8 @@ handle_other(handshake_timeout, _Deb, State) -> throw({handshake_timeout, State#v1.callback}); handle_other(heartbeat_timeout, Deb, State = #v1{connection_state = closed}) -> mainloop(Deb, State); -handle_other(heartbeat_timeout, _Deb, #v1{connection_state = S}) -> +handle_other(heartbeat_timeout, _Deb, State = #v1{connection_state = S}) -> + maybe_emit_stats(State), throw({heartbeat_timeout, S}); handle_other({'$gen_call', From, {shutdown, Explanation}}, Deb, State) -> {ForceTermination, NewState} = terminate(Explanation, State), @@ -376,8 +381,9 @@ handle_other({system, From, Request}, Deb, State = #v1{parent = Parent}) -> handle_other({bump_credit, Msg}, Deb, State) -> credit_flow:handle_bump_msg(Msg), recvloop(Deb, control_throttle(State)); -handle_other(Other, _Deb, _State) -> +handle_other(Other, _Deb, State) -> %% internal error -> something worth dying for + maybe_emit_stats(State), exit({unexpected_message, Other}). switch_callback(State, Callback, Length) -> @@ -839,8 +845,7 @@ handle_method0(#'connection.open'{virtual_host = VHostPath}, rabbit_event:notify(connection_created, [{type, network} | infos(?CREATION_EVENT_KEYS, State1)]), - rabbit_event:if_enabled(State1, #v1.stats_timer, - fun() -> emit_stats(State1) end), + maybe_emit_stats(State1), State1; handle_method0(#'connection.close'{}, State) when ?IS_RUNNING(State) -> lists:foreach(fun rabbit_channel:shutdown/1, all_channels()), @@ -1002,6 +1007,10 @@ cert_info(F, #v1{sock = Sock}) -> {ok, Cert} -> list_to_binary(F(Cert)) end. +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #v1.stats_timer, + fun() -> emit_stats(State) end). + emit_stats(State) -> rabbit_event:notify(connection_stats, infos(?STATISTICS_KEYS, State)), rabbit_event:reset_stats_timer(State, #v1.stats_timer). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 0bb18f4c7e..58e77efb80 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -70,6 +70,7 @@ add(VHostPath) -> {<<"amq.rabbitmq.trace">>, topic}]], ok end), + rabbit_event:notify(vhost_created, info(VHostPath)), R. delete(VHostPath) -> @@ -87,6 +88,7 @@ delete(VHostPath) -> with(VHostPath, fun () -> ok = internal_delete(VHostPath) end)), + ok = rabbit_event:notify(vhost_deleted, [{name, VHostPath}]), R. internal_delete(VHostPath) -> |
