summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/file_handle_cache.erl89
-rw-r--r--src/rabbit_networking.erl6
-rw-r--r--src/rabbit_node_monitor.erl20
-rw-r--r--src/rabbit_nodes.erl3
-rw-r--r--src/rabbit_priority_queue.erl18
5 files changed, 101 insertions, 35 deletions
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index 5a916c75bc..d0fd524fb8 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -30,9 +30,9 @@
%% may happen, especially for writes.
%% 3) Writes are all appends. You cannot write to the middle of a
%% file, although you can truncate and then append if you want.
-%% 4) Although there is a write buffer, there is no read buffer. Feel
-%% free to use the read_ahead mode, but beware of the interaction
-%% between that buffer and the write buffer.
+%% 4) There are read and write buffers. Feel free to use the read_ahead
+%% mode, but beware of the interaction between that buffer and the write
+%% buffer.
%%
%% Some benefits
%% 1) You do not have to remember to call sync before close
@@ -180,8 +180,10 @@
write_buffer,
read_buffer,
read_buffer_pos,
- read_buffer_rem, %% Num of bytes from pos to end
- read_buffer_size_limit,
+ read_buffer_rem, %% Num of bytes from pos to end
+ read_buffer_size, %% Next size of read buffer to use
+ read_buffer_size_limit, %% Max size of read buffer to use
+ read_buffer_usage, %% Bytes we have read from it, for tuning
at_eof,
path,
mode,
@@ -339,22 +341,27 @@ read(Ref, Count) ->
[Ref], keep,
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
- ([Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- offset = Offset}]) when BufRem >= Count ->
+ ([Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_usage = BufUsg,
+ offset = Offset}])
+ when BufRem >= Count ->
<<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf,
- {{ok, Res}, [Handle#handle{offset = Offset + Count,
- read_buffer_pos = BufPos + Count,
- read_buffer_rem = BufRem - Count}]};
- ([Handle = #handle{read_buffer = Buf,
- read_buffer_pos = BufPos,
- read_buffer_rem = BufRem,
- read_buffer_size_limit = BufSzLimit,
- hdl = Hdl,
- offset = Offset}]) ->
+ {{ok, Res}, [Handle#handle{offset = Offset + Count,
+ read_buffer_pos = BufPos + Count,
+ read_buffer_rem = BufRem - Count,
+ read_buffer_usage = BufUsg + Count }]};
+ ([Handle0]) ->
+ Handle = #handle{read_buffer = Buf,
+ read_buffer_pos = BufPos,
+ read_buffer_rem = BufRem,
+ read_buffer_size = BufSz,
+ hdl = Hdl,
+ offset = Offset}
+ = tune_read_buffer_limit(Handle0, Count),
WantedCount = Count - BufRem,
- case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of
+ case prim_file_read(Hdl, lists:max([BufSz, WantedCount])) of
{ok, Data} ->
<<_:BufPos/binary, BufTl/binary>> = Buf,
ReadCount = size(Data),
@@ -369,10 +376,11 @@ read(Ref, Count) ->
OffSet1 = Offset + BufRem + WantedCount,
BufRem1 = ReadCount - WantedCount,
{{ok, <<BufTl/binary, Hd/binary>>},
- [Handle#handle{offset = OffSet1,
- read_buffer = Data,
- read_buffer_pos = WantedCount,
- read_buffer_rem = BufRem1}]}
+ [Handle#handle{offset = OffSet1,
+ read_buffer = Data,
+ read_buffer_pos = WantedCount,
+ read_buffer_rem = BufRem1,
+ read_buffer_usage = WantedCount}]}
end;
eof ->
{eof, [Handle #handle { at_eof = true }]};
@@ -789,7 +797,9 @@ new_closed_handle(Path, Mode, Options) ->
read_buffer = <<>>,
read_buffer_pos = 0,
read_buffer_rem = 0,
+ read_buffer_size = ReadBufferSize,
read_buffer_size_limit = ReadBufferSize,
+ read_buffer_usage = 0,
at_eof = false,
path = Path,
mode = Mode,
@@ -919,6 +929,39 @@ reset_read_buffer(Handle) ->
read_buffer_pos = 0,
read_buffer_rem = 0}.
+%% We come into this function whenever there's been a miss while
+%% reading from the buffer - but note that when we first start with a
+%% new handle the usage will be 0. Therefore in that case don't take
+%% it as meaning the buffer was useless, we just haven't done anything
+%% yet!
+tune_read_buffer_limit(Handle = #handle{read_buffer_usage = 0}, _Count) ->
+ Handle;
+%% In this head we have been using the buffer but now tried to read
+%% outside it. So how did we do? If we used less than the size of the
+%% buffer, make the new buffer the size of what we used before, but
+%% add one byte (so that next time we can distinguish between getting
+%% the buffer size exactly right and actually wanting more). If we
+%% read 100% of what we had, then double it for next time, up to the
+%% limit that was set when we were created.
+tune_read_buffer_limit(Handle = #handle{read_buffer = Buf,
+ read_buffer_usage = Usg,
+ read_buffer_size = Sz,
+ read_buffer_size_limit = Lim}, Count) ->
+ %% If the buffer is <<>> then we are in the first read after a
+ %% reset, the read_buffer_usage is the total usage from before the
+ %% reset. But otherwise we are in a read which read off the end of
+ %% the buffer, so really the size of this read should be included
+ %% in the usage.
+ TotalUsg = case Buf of
+ <<>> -> Usg;
+ _ -> Usg + Count
+ end,
+ Handle#handle{read_buffer_usage = 0,
+ read_buffer_size = erlang:min(case TotalUsg < Sz of
+ true -> Usg + 1;
+ false -> Usg * 2
+ end, Lim)}.
+
infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].
i(total_limit, #fhc_state{limit = Limit}) -> Limit;
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 1a2883748f..f5deaef388 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -394,7 +394,11 @@ node_listeners(Node) ->
mnesia:dirty_read(rabbit_listener, Node).
on_node_down(Node) ->
- ok = mnesia:dirty_delete(rabbit_listener, Node).
+ case lists:member(Node, nodes()) of
+ false -> ok = mnesia:dirty_delete(rabbit_listener, Node);
+ true -> rabbit_log:info(
+ "Keep ~s listeners: the node is already back~n", [Node])
+ end.
start_client(Sock, SockTransform) ->
{ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []),
diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl
index 94a95deb0d..0f00e66e55 100644
--- a/src/rabbit_node_monitor.erl
+++ b/src/rabbit_node_monitor.erl
@@ -633,13 +633,13 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) ->
%% back.
case application:get_env(rabbit, cluster_partition_handling) of
{ok, pause_minority} ->
- case majority() of
+ case majority([Node]) of
true -> ok;
false -> await_cluster_recovery(fun majority/0)
end,
State;
{ok, {pause_if_all_down, PreferredNodes, HowToRecover}} ->
- case in_preferred_partition(PreferredNodes) of
+ case in_preferred_partition(PreferredNodes, [Node]) of
true -> ok;
false -> await_cluster_recovery(
fun in_preferred_partition/0)
@@ -732,7 +732,9 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions,
%% that we do not attempt to deal with individual (other) partitions
%% going away. It's only safe to forget anything about partitions when
%% there are no partitions.
- Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of
+ Down = Partitions -- alive_rabbit_nodes(),
+ NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running),
+ Partitions1 = case Partitions -- Down -- NoLongerPartitioned of
[] -> [];
_ -> Partitions
end,
@@ -825,8 +827,12 @@ disconnect(Node) ->
%% about whether we connect to nodes which are currently disconnected.
majority() ->
+ majority([]).
+
+majority(NodesDown) ->
Nodes = rabbit_mnesia:cluster_nodes(all),
- length(alive_nodes(Nodes)) / length(Nodes) > 0.5.
+ AliveNodes = alive_nodes(Nodes) -- NodesDown,
+ length(AliveNodes) / length(Nodes) > 0.5.
in_preferred_partition() ->
{ok, {pause_if_all_down, PreferredNodes, _}} =
@@ -834,9 +840,13 @@ in_preferred_partition() ->
in_preferred_partition(PreferredNodes).
in_preferred_partition(PreferredNodes) ->
+ in_preferred_partition(PreferredNodes, []).
+
+in_preferred_partition(PreferredNodes, NodesDown) ->
Nodes = rabbit_mnesia:cluster_nodes(all),
RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)],
- RealPreferredNodes =:= [] orelse alive_nodes(RealPreferredNodes) =/= [].
+ AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown,
+ RealPreferredNodes =:= [] orelse AliveNodes =/= [].
all_nodes_up() ->
Nodes = rabbit_mnesia:cluster_nodes(all),
diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl
index d3c0e55ff8..bbe0d35719 100644
--- a/src/rabbit_nodes.erl
+++ b/src/rabbit_nodes.erl
@@ -200,11 +200,10 @@ set_cluster_name(Name) ->
rabbit_runtime_parameters:set_global(cluster_name, Name).
ensure_epmd() ->
- {ok, Root} = init:get_argument(root),
{ok, Prog} = init:get_argument(progname),
ID = random:uniform(1000000000),
Port = open_port(
- {spawn_executable, filename:join([Root, "bin", Prog])},
+ {spawn_executable, os:find_executable(Prog)},
[{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]),
"-noshell", "-eval", "halt()."]},
exit_status, stderr_to_stdout, use_stdio]),
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 1d9522f613..e20e4b4c14 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -332,7 +332,7 @@ set_ram_duration_target(DurationTarget,
?passthrough1(set_ram_duration_target(DurationTarget, BQS)).
ram_duration(State = #state{bq = BQ}) ->
- fold_add2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State);
+ fold_min2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State);
ram_duration(State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(ram_duration(BQS)).
@@ -459,6 +459,13 @@ fold_add2(Fun, State) ->
{add_maybe_infinity(Res, Acc), BQSN1}
end, 0, State).
+%% Fold over results assuming results are numbers and we want the minimum
+fold_min2(Fun, State) ->
+ fold2(fun (P, BQSN, Acc) ->
+ {Res, BQSN1} = Fun(P, BQSN),
+ {erlang:min(Res, Acc), BQSN1}
+ end, infinity, State).
+
%% Fold over results assuming results are lists and we want to append
%% them, and also that we have some AckTags we want to pass in to each
%% invocation.
@@ -526,9 +533,12 @@ a(State = #state{bqss = BQSs}) ->
priority(P, BQSs) when is_integer(P) ->
{P, bq_fetch(P, BQSs)};
-priority(_Msg, [{P, BQSN}]) ->
+priority(#basic_message{content = Content}, BQSs) ->
+ priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs).
+
+priority1(_Content, [{P, BQSN}]) ->
{P, BQSN};
-priority(Msg = #basic_message{content = #content{properties = Props}},
+priority1(Content = #content{properties = Props},
[{P, BQSN} | Rest]) ->
#'P_basic'{priority = Priority0} = Props,
Priority = case Priority0 of
@@ -537,7 +547,7 @@ priority(Msg = #basic_message{content = #content{properties = Props}},
end,
case Priority >= P of
true -> {P, BQSN};
- false -> priority(Msg, Rest)
+ false -> priority1(Content, Rest)
end.
add_maybe_infinity(infinity, _) -> infinity;