summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2017-03-08 20:50:40 +0300
committerMichael Klishin <michael@clojurewerkz.org>2017-03-08 20:50:40 +0300
commit1f6d6d4fcc2107a69bb5fd91601f9168adc97d54 (patch)
treeee84abc078dcad96b5e8e63f09688f3deb4d5f3d /src
parentbf74a923c59a6ec6bc8f17be803f3281a3047ce0 (diff)
parentb1ce3f16a605ad9f06fe98c5e7ff49ba23f4b1f4 (diff)
downloadrabbitmq-server-git-1f6d6d4fcc2107a69bb5fd91601f9168adc97d54.tar.gz
Merge branch 'stable' into rabbitmq-server-1122
Diffstat (limited to 'src')
-rw-r--r--src/rabbit.erl2
-rw-r--r--src/rabbit_alarm.erl10
-rw-r--r--src/rabbit_cli.erl2
-rw-r--r--src/rabbit_core_metrics_gc.erl27
-rw-r--r--src/rabbit_queue_index.erl41
5 files changed, 56 insertions, 26 deletions
diff --git a/src/rabbit.erl b/src/rabbit.erl
index bed7058057..c1f89af574 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -475,7 +475,7 @@ start_apps(Apps) ->
prompt ->
IoDevice = get_input_iodevice(),
io:setopts(IoDevice, [{echo, false}]),
- PP = lists:droplast(io:get_line(IoDevice,
+ PP = rabbit_misc:lists_droplast(io:get_line(IoDevice,
"\nPlease enter the passphrase to unlock encrypted "
"configuration entries.\n\nPassphrase: ")),
io:setopts(IoDevice, [{echo, true}]),
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index daf2c167fa..9466dd0eeb 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -128,8 +128,12 @@ handle_call(_Request, State) ->
handle_event({set_alarm, {{resource_limit, Source, Node}, []}}, State) ->
case is_node_alarmed(Source, Node, State) of
- true -> {ok, State};
- false -> handle_set_resource_alarm(Source, Node, State)
+ true ->
+ {ok, State};
+ false ->
+ rabbit_event:notify(alarm_set, [{source, Source},
+ {node, Node}]),
+ handle_set_resource_alarm(Source, Node, State)
end;
handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
case lists:member(Alarm, Alarms) of
@@ -141,6 +145,8 @@ handle_event({set_alarm, Alarm}, State = #alarms{alarms = Alarms}) ->
handle_event({clear_alarm, {resource_limit, Source, Node}}, State) ->
case is_node_alarmed(Source, Node, State) of
true ->
+ rabbit_event:notify(alarm_cleared, [{source, Source},
+ {node, Node}]),
handle_clear_resource_alarm(Source, Node, State);
false ->
{ok, State}
diff --git a/src/rabbit_cli.erl b/src/rabbit_cli.erl
index 1feda43b6e..fb4ce328ee 100644
--- a/src/rabbit_cli.erl
+++ b/src/rabbit_cli.erl
@@ -277,7 +277,7 @@ mutually_exclusive_flags(CurrentOptionValues, Default, FlagsAndValues) ->
{ok, Value};
_ ->
Names = [ [$', N, $'] || {N, _} <- PresentFlags ],
- CommaSeparated = string:join(lists:droplast(Names), ", "),
+ CommaSeparated = string:join(rabbit_misc:lists_droplast(Names), ", "),
AndOneMore = lists:last(Names),
Msg = io_lib:format("Options ~s and ~s are mutually exclusive", [CommaSeparated, AndOneMore]),
{error, lists:flatten(Msg)}
diff --git a/src/rabbit_core_metrics_gc.erl b/src/rabbit_core_metrics_gc.erl
index e7f848bbc3..3141fdc301 100644
--- a/src/rabbit_core_metrics_gc.erl
+++ b/src/rabbit_core_metrics_gc.erl
@@ -64,7 +64,6 @@ gc_connections() ->
gc_process(connection_coarse_metrics).
gc_channels() ->
- %% TODO channel stats
gc_process(channel_created),
gc_process(channel_metrics),
gc_process(channel_process_metrics),
@@ -96,16 +95,17 @@ gc_gen_server2() ->
gc_process(Table) ->
ets:foldl(fun({Pid = Key, _}, none) ->
gc_process(Pid, Table, Key);
+ ({Pid = Key, _, _, _, _}, none) ->
+ gc_process(Pid, Table, Key);
({Pid = Key, _, _, _}, none) ->
gc_process(Pid, Table, Key)
end, none, Table).
gc_process(Pid, Table, Key) ->
- case erlang:is_process_alive(Pid) of
+ case rabbit_misc:is_process_alive(Pid) of
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
@@ -115,6 +115,8 @@ gc_entity(Table, GbSet) ->
gc_entity(Id, Table, Key, GbSet);
({Id = Key, _}, none) ->
gc_entity(Id, Table, Key, GbSet);
+ ({Id = Key, _, _}, none) ->
+ gc_entity(Id, Table, Key, GbSet);
({Id = Key, _, _, _, _}, none) ->
gc_entity(Id, Table, Key, GbSet)
end, none, Table).
@@ -124,40 +126,35 @@ gc_entity(Id, Table, Key, GbSet) ->
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
gc_process_and_entity(Table, GbSet) ->
- ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _}, none)
+ ets:foldl(fun({{Pid, Id} = Key, _, _, _, _, _, _, _}, none)
when Table == channel_queue_metrics ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
- ({{Pid, Id} = Key, _, _, _}, none)
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
+ ({{Pid, Id} = Key, _, _, _, _}, none)
when Table == channel_exchange_metrics ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{Id, Pid, _} = Key, _, _, _, _}, none)
when Table == consumer_created ->
- gc_entity(Id, Table, Key, GbSet),
- gc_process(Pid, Table, Key);
+ gc_process_and_entity(Id, Pid, Table, Key, GbSet);
({{{Pid, Id}, _} = Key, _, _, _, _}, none) ->
gc_process_and_entity(Id, Pid, Table, Key, GbSet)
end, none, Table).
gc_process_and_entity(Id, Pid, Table, Key, GbSet) ->
- case erlang:is_process_alive(Pid) orelse gb_sets:is_member(Id, GbSet) of
+ case rabbit_misc:is_process_alive(Pid) andalso gb_sets:is_member(Id, GbSet) of
true ->
none;
false ->
- %% TODO catch?
ets:delete(Table, Key),
none
end.
gc_process_and_entities(Table, QueueGbSet, ExchangeGbSet) ->
- ets:foldl(fun({{Pid, {Q, X}} = Key, _}, none) ->
+ ets:foldl(fun({{Pid, {Q, X}} = Key, _, _}, none) ->
gc_process(Pid, Table, Key),
gc_entity(Q, Table, Key, QueueGbSet),
gc_entity(X, Table, Key, ExchangeGbSet)
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 6560d61625..f1a240eed8 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -176,13 +176,40 @@
%%----------------------------------------------------------------------------
--record(qistate, {dir, segments, journal_handle, dirty_count,
- max_journal_entries, on_sync, on_sync_msg,
- unconfirmed, unconfirmed_msg,
- pre_publish_cache, delivered_cache}).
-
--record(segment, {num, path, journal_entries,
- entries_to_segment, unacked}).
+-record(qistate, {
+ %% queue directory where segment and journal files are stored
+ dir,
+ %% map of #segment records
+ segments,
+ %% journal file handle obtained from/used by file_handle_cache
+ journal_handle,
+ %% how many not yet flushed entries are there
+ dirty_count,
+ %% this many not yet flushed journal entries will force a flush
+ max_journal_entries,
+ %% callback function invoked when a message is "handled"
+ %% by the index and potentially can be confirmed to the publisher
+ on_sync,
+ on_sync_msg,
+ %% set of IDs of unconfirmed [to publishers] messages
+ unconfirmed,
+ unconfirmed_msg,
+ %% optimisation
+ pre_publish_cache,
+ %% optimisation
+ delivered_cache}).
+
+-record(segment, {
+ %% segment ID (an integer)
+ num,
+ %% segment file path (see also ?SEGMENT_EXTENSION)
+ path,
+ %% index operation log entries in this segment
+ journal_entries,
+ entries_to_segment,
+ %% counter of unacknowledged messages
+ unacked
+}).
-include("rabbit.hrl").