summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_alarm.erl2
-rw-r--r--src/rabbit_error_logger.erl4
-rw-r--r--src/rabbit_mirror_queue_sync.erl59
3 files changed, 60 insertions, 5 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl
index c11b6e4383..30743ea243 100644
--- a/src/rabbit_alarm.erl
+++ b/src/rabbit_alarm.erl
@@ -50,7 +50,7 @@
alarms :: [alarm()]}).
-type(local_alarm() :: 'file_descriptor_limit').
--type(resource_alarm_source() :: 'disk' | 'node').
+-type(resource_alarm_source() :: 'disk' | 'memory').
-type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}).
-type(alarm() :: local_alarm() | resource_alarm()).
diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl
index bbaf9577f9..d847284243 100644
--- a/src/rabbit_error_logger.erl
+++ b/src/rabbit_error_logger.erl
@@ -104,9 +104,11 @@ publish1(RoutingKey, Format, Data, LogExch) ->
Timestamp = time_compat:os_system_time(seconds),
Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data],
+ Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}],
{ok, _DeliveredQPids} =
rabbit_basic:publish(LogExch, RoutingKey,
#'P_basic'{content_type = <<"text/plain">>,
- timestamp = Timestamp},
+ timestamp = Timestamp,
+ headers = Headers},
list_to_binary(io_lib:format(Format, Args))),
ok.
diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl
index 2d8bdfa860..a97a9b50c8 100644
--- a/src/rabbit_mirror_queue_sync.erl
+++ b/src/rabbit_mirror_queue_sync.erl
@@ -18,7 +18,7 @@
-include("rabbit.hrl").
--export([master_prepare/4, master_go/8, slave/7]).
+-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]).
-define(SYNC_PROGRESS_INTERVAL, 1000000).
@@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) ->
[] -> Log("all slaves already synced", []);
SPids1 -> MPid ! {ready, self()},
Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]),
- syncer_loop(Ref, MPid, SPids1)
+ syncer_check_resources(Ref, MPid, SPids1)
end.
await_slaves(Ref, SPids) ->
@@ -217,12 +217,43 @@ await_slaves(Ref, SPids) ->
%% 'sync_start' and so will not reply. We need to act as though they are
%% down.
+syncer_check_resources(Ref, MPid, SPids) ->
+ rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}),
+ %% Before we ask the master node to send the first batch of messages
+ %% over here, we check if one node is already short on memory. If
+ %% that's the case, we wait for the alarm to be cleared before
+ %% starting the syncer loop.
+ AlarmedNodes = lists:any(
+ fun
+ ({{resource_limit, memory, _}, _}) -> true;
+ ({_, _}) -> false
+ end, rabbit_alarm:get_alarms()),
+ if
+ not AlarmedNodes ->
+ MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids);
+ true ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> MPid ! {next, Ref},
+ syncer_loop(Ref, MPid, SPids1)
+ end
+ end.
+
syncer_loop(Ref, MPid, SPids) ->
- MPid ! {next, Ref},
receive
+ {conserve_resources, memory, true} ->
+ case wait_for_resources(Ref, SPids) of
+ cancel -> ok;
+ SPids1 -> syncer_loop(Ref, MPid, SPids1)
+ end;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ syncer_loop(Ref, MPid, SPids);
{msgs, Ref, Msgs} ->
SPids1 = wait_for_credit(SPids),
broadcast(SPids1, {sync_msgs, Ref, Msgs}),
+ MPid ! {next, Ref},
syncer_loop(Ref, MPid, SPids1);
{cancel, Ref} ->
%% We don't tell the slaves we will die - so when we do
@@ -239,6 +270,10 @@ broadcast(SPids, Msg) ->
SPid ! Msg
end || SPid <- SPids].
+conserve_resources(Pid, Source, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Source, Conserve},
+ ok.
+
wait_for_credit(SPids) ->
case credit_flow:blocked() of
true -> receive
@@ -252,6 +287,24 @@ wait_for_credit(SPids) ->
false -> SPids
end.
+wait_for_resources(Ref, SPids) ->
+ receive
+ {conserve_resources, memory, false} ->
+ SPids;
+ {conserve_resources, _, _} ->
+ %% Ignore other alerts.
+ wait_for_resources(Ref, SPids);
+ {cancel, Ref} ->
+ %% We don't tell the slaves we will die - so when we do
+ %% they interpret that as a failure, which is what we
+ %% want.
+ cancel;
+ {'DOWN', _, process, SPid, _} ->
+ credit_flow:peer_down(SPid),
+ SPids1 = wait_for_credit(lists:delete(SPid, SPids)),
+ wait_for_resources(Ref, SPids1)
+ end.
+
%% Syncer
%% ---------------------------------------------------------------------------
%% Slave