diff options
| author | Alex Thomas <alext@lshift.net> | 2015-03-25 17:42:55 +0000 |
|---|---|---|
| committer | Alex Thomas <alext@lshift.net> | 2015-03-25 17:42:55 +0000 |
| commit | 0ef3053ffa05b1ec042556507b14a2fdb9a4135a (patch) | |
| tree | 6a748695e9333357901af3fbfc02c4a158ccf9d8 | |
| parent | f6dcdf11dccff7e249a4ff5b502c3ee42a96e8b3 (diff) | |
| parent | ada55d3d3f04e909b267b69251a4de12148b69de (diff) | |
| download | rabbitmq-server-git-0ef3053ffa05b1ec042556507b14a2fdb9a4135a.tar.gz | |
Merge branch 'master' into reusable-worker-pool
| -rw-r--r-- | .gitignore | 1 | ||||
| -rw-r--r-- | packaging/RPMS/Fedora/rabbitmq-server.spec | 4 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/changelog | 6 | ||||
| -rw-r--r-- | packaging/debs/Debian/debian/postinst | 1 | ||||
| -rw-r--r-- | src/file_handle_cache.erl | 89 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_nodes.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 18 |
9 files changed, 113 insertions, 35 deletions
diff --git a/.gitignore b/.gitignore index 366e71ae44..1b1e92af4d 100644 --- a/.gitignore +++ b/.gitignore @@ -14,6 +14,7 @@ erl_crash.dump /cover/ /dist/ /ebin/ +/etc/ /plugins/ /priv/plugins/ /deps.mk diff --git a/packaging/RPMS/Fedora/rabbitmq-server.spec b/packaging/RPMS/Fedora/rabbitmq-server.spec index 05781f8b85..115aceb916 100644 --- a/packaging/RPMS/Fedora/rabbitmq-server.spec +++ b/packaging/RPMS/Fedora/rabbitmq-server.spec @@ -95,6 +95,7 @@ fi if [ -f %{_sysconfdir}/rabbitmq/rabbitmq.conf ] && [ ! -f %{_sysconfdir}/rabbitmq/rabbitmq-env.conf ]; then mv %{_sysconfdir}/rabbitmq/rabbitmq.conf %{_sysconfdir}/rabbitmq/rabbitmq-env.conf fi +chmod -R o-rwx,g-w %{_localstatedir}/lib/rabbitmq/mnesia %preun if [ $1 = 0 ]; then @@ -128,6 +129,9 @@ done rm -rf %{buildroot} %changelog +* Wed Mar 11 2015 jean-sebastien@rabbitmq.com 3.5.0-1 +- New Upstream Release + * Wed Feb 11 2015 michael@rabbitmq.com 3.4.4-1 - New Upstream Release diff --git a/packaging/debs/Debian/debian/changelog b/packaging/debs/Debian/debian/changelog index 242ac9d701..b2c805b3ad 100644 --- a/packaging/debs/Debian/debian/changelog +++ b/packaging/debs/Debian/debian/changelog @@ -1,3 +1,9 @@ +rabbitmq-server (3.5.0-1) unstable; urgency=low + + * New Upstream Release + + -- Jean-Sébastien Pédron <jean-sebastien@rabbitmq.com> Wed, 11 Mar 2015 13:56:19 +0000 + rabbitmq-server (3.4.4-1) unstable; urgency=low * New Upstream Release diff --git a/packaging/debs/Debian/debian/postinst b/packaging/debs/Debian/debian/postinst index 7238ba471f..c83881e6ba 100644 --- a/packaging/debs/Debian/debian/postinst +++ b/packaging/debs/Debian/debian/postinst @@ -33,6 +33,7 @@ fi chown -R rabbitmq:rabbitmq /var/lib/rabbitmq chown -R rabbitmq:rabbitmq /var/log/rabbitmq chmod 750 /var/lib/rabbitmq/mnesia +chmod -R o-rwx,g-w /var/lib/rabbitmq/mnesia case "$1" in configure) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index 5a916c75bc..d0fd524fb8 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -30,9 +30,9 @@ %% may happen, especially for writes. %% 3) Writes are all appends. You cannot write to the middle of a %% file, although you can truncate and then append if you want. -%% 4) Although there is a write buffer, there is no read buffer. Feel -%% free to use the read_ahead mode, but beware of the interaction -%% between that buffer and the write buffer. +%% 4) There are read and write buffers. Feel free to use the read_ahead +%% mode, but beware of the interaction between that buffer and the write +%% buffer. %% %% Some benefits %% 1) You do not have to remember to call sync before close @@ -180,8 +180,10 @@ write_buffer, read_buffer, read_buffer_pos, - read_buffer_rem, %% Num of bytes from pos to end - read_buffer_size_limit, + read_buffer_rem, %% Num of bytes from pos to end + read_buffer_size, %% Next size of read buffer to use + read_buffer_size_limit, %% Max size of read buffer to use + read_buffer_usage, %% Bytes we have read from it, for tuning at_eof, path, mode, @@ -339,22 +341,27 @@ read(Ref, Count) -> [Ref], keep, fun ([#handle { is_read = false }]) -> {error, not_open_for_reading}; - ([Handle = #handle{read_buffer = Buf, - read_buffer_pos = BufPos, - read_buffer_rem = BufRem, - offset = Offset}]) when BufRem >= Count -> + ([Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + read_buffer_usage = BufUsg, + offset = Offset}]) + when BufRem >= Count -> <<_:BufPos/binary, Res:Count/binary, _/binary>> = Buf, - {{ok, Res}, [Handle#handle{offset = Offset + Count, - read_buffer_pos = BufPos + Count, - read_buffer_rem = BufRem - Count}]}; - ([Handle = #handle{read_buffer = Buf, - read_buffer_pos = BufPos, - read_buffer_rem = BufRem, - read_buffer_size_limit = BufSzLimit, - hdl = Hdl, - offset = Offset}]) -> + {{ok, Res}, [Handle#handle{offset = Offset + Count, + read_buffer_pos = BufPos + Count, + read_buffer_rem = BufRem - Count, + read_buffer_usage = BufUsg + Count }]}; + ([Handle0]) -> + Handle = #handle{read_buffer = Buf, + read_buffer_pos = BufPos, + read_buffer_rem = BufRem, + read_buffer_size = BufSz, + hdl = Hdl, + offset = Offset} + = tune_read_buffer_limit(Handle0, Count), WantedCount = Count - BufRem, - case prim_file_read(Hdl, lists:max([BufSzLimit, WantedCount])) of + case prim_file_read(Hdl, lists:max([BufSz, WantedCount])) of {ok, Data} -> <<_:BufPos/binary, BufTl/binary>> = Buf, ReadCount = size(Data), @@ -369,10 +376,11 @@ read(Ref, Count) -> OffSet1 = Offset + BufRem + WantedCount, BufRem1 = ReadCount - WantedCount, {{ok, <<BufTl/binary, Hd/binary>>}, - [Handle#handle{offset = OffSet1, - read_buffer = Data, - read_buffer_pos = WantedCount, - read_buffer_rem = BufRem1}]} + [Handle#handle{offset = OffSet1, + read_buffer = Data, + read_buffer_pos = WantedCount, + read_buffer_rem = BufRem1, + read_buffer_usage = WantedCount}]} end; eof -> {eof, [Handle #handle { at_eof = true }]}; @@ -789,7 +797,9 @@ new_closed_handle(Path, Mode, Options) -> read_buffer = <<>>, read_buffer_pos = 0, read_buffer_rem = 0, + read_buffer_size = ReadBufferSize, read_buffer_size_limit = ReadBufferSize, + read_buffer_usage = 0, at_eof = false, path = Path, mode = Mode, @@ -919,6 +929,39 @@ reset_read_buffer(Handle) -> read_buffer_pos = 0, read_buffer_rem = 0}. +%% We come into this function whenever there's been a miss while +%% reading from the buffer - but note that when we first start with a +%% new handle the usage will be 0. Therefore in that case don't take +%% it as meaning the buffer was useless, we just haven't done anything +%% yet! +tune_read_buffer_limit(Handle = #handle{read_buffer_usage = 0}, _Count) -> + Handle; +%% In this head we have been using the buffer but now tried to read +%% outside it. So how did we do? If we used less than the size of the +%% buffer, make the new buffer the size of what we used before, but +%% add one byte (so that next time we can distinguish between getting +%% the buffer size exactly right and actually wanting more). If we +%% read 100% of what we had, then double it for next time, up to the +%% limit that was set when we were created. +tune_read_buffer_limit(Handle = #handle{read_buffer = Buf, + read_buffer_usage = Usg, + read_buffer_size = Sz, + read_buffer_size_limit = Lim}, Count) -> + %% If the buffer is <<>> then we are in the first read after a + %% reset, the read_buffer_usage is the total usage from before the + %% reset. But otherwise we are in a read which read off the end of + %% the buffer, so really the size of this read should be included + %% in the usage. + TotalUsg = case Buf of + <<>> -> Usg; + _ -> Usg + Count + end, + Handle#handle{read_buffer_usage = 0, + read_buffer_size = erlang:min(case TotalUsg < Sz of + true -> Usg + 1; + false -> Usg * 2 + end, Lim)}. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 1a2883748f..f5deaef388 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -394,7 +394,11 @@ node_listeners(Node) -> mnesia:dirty_read(rabbit_listener, Node). on_node_down(Node) -> - ok = mnesia:dirty_delete(rabbit_listener, Node). + case lists:member(Node, nodes()) of + false -> ok = mnesia:dirty_delete(rabbit_listener, Node); + true -> rabbit_log:info( + "Keep ~s listeners: the node is already back~n", [Node]) + end. start_client(Sock, SockTransform) -> {ok, _Child, Reader} = supervisor:start_child(rabbit_tcp_client_sup, []), diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 94a95deb0d..0f00e66e55 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -633,13 +633,13 @@ handle_dead_node(Node, State = #state{autoheal = Autoheal}) -> %% back. case application:get_env(rabbit, cluster_partition_handling) of {ok, pause_minority} -> - case majority() of + case majority([Node]) of true -> ok; false -> await_cluster_recovery(fun majority/0) end, State; {ok, {pause_if_all_down, PreferredNodes, HowToRecover}} -> - case in_preferred_partition(PreferredNodes) of + case in_preferred_partition(PreferredNodes, [Node]) of true -> ok; false -> await_cluster_recovery( fun in_preferred_partition/0) @@ -732,7 +732,9 @@ handle_dead_rabbit(Node, State = #state{partitions = Partitions, %% that we do not attempt to deal with individual (other) partitions %% going away. It's only safe to forget anything about partitions when %% there are no partitions. - Partitions1 = case Partitions -- (Partitions -- alive_rabbit_nodes()) of + Down = Partitions -- alive_rabbit_nodes(), + NoLongerPartitioned = rabbit_mnesia:cluster_nodes(running), + Partitions1 = case Partitions -- Down -- NoLongerPartitioned of [] -> []; _ -> Partitions end, @@ -825,8 +827,12 @@ disconnect(Node) -> %% about whether we connect to nodes which are currently disconnected. majority() -> + majority([]). + +majority(NodesDown) -> Nodes = rabbit_mnesia:cluster_nodes(all), - length(alive_nodes(Nodes)) / length(Nodes) > 0.5. + AliveNodes = alive_nodes(Nodes) -- NodesDown, + length(AliveNodes) / length(Nodes) > 0.5. in_preferred_partition() -> {ok, {pause_if_all_down, PreferredNodes, _}} = @@ -834,9 +840,13 @@ in_preferred_partition() -> in_preferred_partition(PreferredNodes). in_preferred_partition(PreferredNodes) -> + in_preferred_partition(PreferredNodes, []). + +in_preferred_partition(PreferredNodes, NodesDown) -> Nodes = rabbit_mnesia:cluster_nodes(all), RealPreferredNodes = [N || N <- PreferredNodes, lists:member(N, Nodes)], - RealPreferredNodes =:= [] orelse alive_nodes(RealPreferredNodes) =/= []. + AliveNodes = alive_nodes(RealPreferredNodes) -- NodesDown, + RealPreferredNodes =:= [] orelse AliveNodes =/= []. all_nodes_up() -> Nodes = rabbit_mnesia:cluster_nodes(all), diff --git a/src/rabbit_nodes.erl b/src/rabbit_nodes.erl index d3c0e55ff8..bbe0d35719 100644 --- a/src/rabbit_nodes.erl +++ b/src/rabbit_nodes.erl @@ -200,11 +200,10 @@ set_cluster_name(Name) -> rabbit_runtime_parameters:set_global(cluster_name, Name). ensure_epmd() -> - {ok, Root} = init:get_argument(root), {ok, Prog} = init:get_argument(progname), ID = random:uniform(1000000000), Port = open_port( - {spawn_executable, filename:join([Root, "bin", Prog])}, + {spawn_executable, os:find_executable(Prog)}, [{args, ["-sname", rabbit_misc:format("epmd-starter-~b", [ID]), "-noshell", "-eval", "halt()."]}, exit_status, stderr_to_stdout, use_stdio]), diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 1d9522f613..e20e4b4c14 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -332,7 +332,7 @@ set_ram_duration_target(DurationTarget, ?passthrough1(set_ram_duration_target(DurationTarget, BQS)). ram_duration(State = #state{bq = BQ}) -> - fold_add2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State); + fold_min2(fun (_P, BQSN) -> BQ:ram_duration(BQSN) end, State); ram_duration(State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(ram_duration(BQS)). @@ -459,6 +459,13 @@ fold_add2(Fun, State) -> {add_maybe_infinity(Res, Acc), BQSN1} end, 0, State). +%% Fold over results assuming results are numbers and we want the minimum +fold_min2(Fun, State) -> + fold2(fun (P, BQSN, Acc) -> + {Res, BQSN1} = Fun(P, BQSN), + {erlang:min(Res, Acc), BQSN1} + end, infinity, State). + %% Fold over results assuming results are lists and we want to append %% them, and also that we have some AckTags we want to pass in to each %% invocation. @@ -526,9 +533,12 @@ a(State = #state{bqss = BQSs}) -> priority(P, BQSs) when is_integer(P) -> {P, bq_fetch(P, BQSs)}; -priority(_Msg, [{P, BQSN}]) -> +priority(#basic_message{content = Content}, BQSs) -> + priority1(rabbit_binary_parser:ensure_content_decoded(Content), BQSs). + +priority1(_Content, [{P, BQSN}]) -> {P, BQSN}; -priority(Msg = #basic_message{content = #content{properties = Props}}, +priority1(Content = #content{properties = Props}, [{P, BQSN} | Rest]) -> #'P_basic'{priority = Priority0} = Props, Priority = case Priority0 of @@ -537,7 +547,7 @@ priority(Msg = #basic_message{content = #content{properties = Props}}, end, case Priority >= P of true -> {P, BQSN}; - false -> priority(Msg, Rest) + false -> priority1(Content, Rest) end. add_maybe_infinity(infinity, _) -> infinity; |
