diff options
| author | Michael Klishin <mklishin@pivotal.io> | 2016-02-18 18:30:18 +0300 |
|---|---|---|
| committer | Michael Klishin <mklishin@pivotal.io> | 2016-02-18 18:30:18 +0300 |
| commit | c66d51f4f57c425bfa2fa152bd4895ed56915ceb (patch) | |
| tree | 0214fe81f65323a7eda762306bf443bbe0a4e6b7 /src | |
| parent | 4152ecbc7a7a4d0223a4f51f542149da04f37692 (diff) | |
| parent | 01e039f4dd2b6d78d9d67f7472bc06394f320fc3 (diff) | |
| download | rabbitmq-server-git-c66d51f4f57c425bfa2fa152bd4895ed56915ceb.tar.gz | |
Merge branch 'stable' into rabbitmq-server-248
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_alarm.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_error_logger.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_sync.erl | 59 |
3 files changed, 60 insertions, 5 deletions
diff --git a/src/rabbit_alarm.erl b/src/rabbit_alarm.erl index c11b6e4383..30743ea243 100644 --- a/src/rabbit_alarm.erl +++ b/src/rabbit_alarm.erl @@ -50,7 +50,7 @@ alarms :: [alarm()]}). -type(local_alarm() :: 'file_descriptor_limit'). --type(resource_alarm_source() :: 'disk' | 'node'). +-type(resource_alarm_source() :: 'disk' | 'memory'). -type(resource_alarm() :: {resource_limit, resource_alarm_source(), node()}). -type(alarm() :: local_alarm() | resource_alarm()). diff --git a/src/rabbit_error_logger.erl b/src/rabbit_error_logger.erl index bbaf9577f9..d847284243 100644 --- a/src/rabbit_error_logger.erl +++ b/src/rabbit_error_logger.erl @@ -104,9 +104,11 @@ publish1(RoutingKey, Format, Data, LogExch) -> Timestamp = time_compat:os_system_time(seconds), Args = [truncate:term(A, ?LOG_TRUNC) || A <- Data], + Headers = [{<<"node">>, longstr, list_to_binary(atom_to_list(node()))}], {ok, _DeliveredQPids} = rabbit_basic:publish(LogExch, RoutingKey, #'P_basic'{content_type = <<"text/plain">>, - timestamp = Timestamp}, + timestamp = Timestamp, + headers = Headers}, list_to_binary(io_lib:format(Format, Args))), ok. diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2d8bdfa860..a97a9b50c8 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7]). +-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) -> [] -> Log("all slaves already synced", []); SPids1 -> MPid ! {ready, self()}, Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), - syncer_loop(Ref, MPid, SPids1) + syncer_check_resources(Ref, MPid, SPids1) end. await_slaves(Ref, SPids) -> @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) -> %% 'sync_start' and so will not reply. We need to act as though they are %% down. +syncer_check_resources(Ref, MPid, SPids) -> + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + %% Before we ask the master node to send the first batch of messages + %% over here, we check if one node is already short on memory. If + %% that's the case, we wait for the alarm to be cleared before + %% starting the syncer loop. + AlarmedNodes = lists:any( + fun + ({{resource_limit, memory, _}, _}) -> true; + ({_, _}) -> false + end, rabbit_alarm:get_alarms()), + if + not AlarmedNodes -> + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids); + true -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end + end. + syncer_loop(Ref, MPid, SPids) -> - MPid ! {next, Ref}, receive + {conserve_resources, memory, true} -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> syncer_loop(Ref, MPid, SPids1) + end; + {conserve_resources, _, _} -> + %% Ignore other alerts. + syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -239,6 +270,10 @@ broadcast(SPids, Msg) -> SPid ! Msg end || SPid <- SPids]. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, + ok. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -252,6 +287,24 @@ wait_for_credit(SPids) -> false -> SPids end. +wait_for_resources(Ref, SPids) -> + receive + {conserve_resources, memory, false} -> + SPids; + {conserve_resources, _, _} -> + %% Ignore other alerts. + wait_for_resources(Ref, SPids); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + cancel; + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + SPids1 = wait_for_credit(lists:delete(SPid, SPids)), + wait_for_resources(Ref, SPids1) + end. + %% Syncer %% --------------------------------------------------------------------------- %% Slave |
