diff options
| author | D Corbacho <diana@rabbitmq.com> | 2018-12-19 09:28:36 +0000 |
|---|---|---|
| committer | GitHub <noreply@github.com> | 2018-12-19 09:28:36 +0000 |
| commit | e8fb108131762a5511c7c6b091642f19cd29994b (patch) | |
| tree | 329afae94009e0db36884da2fc722d96cf4f52e6 /src | |
| parent | 6b73cadf47b2a23be04c3c25fed216f3e8240458 (diff) | |
| parent | 78eedc2323bdf318d0c340f0479ae8ea1065a7b2 (diff) | |
| download | rabbitmq-server-git-e8fb108131762a5511c7c6b091642f19cd29994b.tar.gz | |
Merge pull request #1803 from rabbitmq/vanlightly-bugs
Bugfixes
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 108 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_quorum_memory_manager.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 67 |
5 files changed, 134 insertions, 83 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f749d9f30e..996774fe35 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, consumer_mapping = ConsumerMapping} = State0) -> case QueueStates of #{Name := QState0} -> + QName = rabbit_quorum_queue:queue_name(QState0), case rabbit_quorum_queue:handle_event(Evt, QState0) of {{delivery, CTag, Msgs}, QState1} -> AckRequired = case maps:find(CTag, ConsumerMapping) of @@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, true -> QState1 end, - QName = rabbit_quorum_queue:queue_name(QState2), State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), @@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, %% TODO: this should use dtree:take/3 {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(Name, QNames) of - {ok, QName} -> erase_queue_stats(QName); - error -> ok - end, + erase_queue_stats(QName), noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), queue_names = maps:remove(Name, QNames)}) diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d365d41a96..8c7b208855 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -56,7 +56,7 @@ make_discard/2, make_credit/4, make_purge/0, - make_update_state/1 + make_update_config/1 ]). -type raw_msg() :: term(). @@ -131,7 +131,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). --record(update_state, {config :: config()}). +-record(update_config, {config :: config()}). @@ -143,7 +143,7 @@ #discard{} | #credit{} | #purge{} | - #update_state{}. + #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -260,10 +260,10 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_state(Conf, #state{name = Name, + update_config(Conf, #state{name = Name, queue_resource = Resource}). -update_state(Conf, State) -> +update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), @@ -435,18 +435,24 @@ apply(_, {down, ConsumerPid, noconnection}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), - % mark all consumers and enqueuers as suspect - % and monitor the node - {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - {maps:put(K, C#consumer{suspected_down = true, - checked_out = #{}}, - Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + % mark all consumers and enqueuers as suspected down + % and monitor the node so that we can find out the final state of the + % process at some later point + {Cons, State} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -515,8 +521,8 @@ apply(_, {nodeup, Node}, Effects0, service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}; -apply(_, #update_state{config = Conf}, Effects, State) -> - {update_state(Conf, State), Effects, ok}. +apply(_, #update_config{config = Conf}, Effects, State) -> + {update_config(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -587,7 +593,9 @@ overview(#state{consumers = Cons, get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> - [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)]; + [{K, snd(snd(maps:get(K, Checked)))} + || K <- lists:seq(From, To), + maps:is_key(K, Checked)]; _ -> [] end. @@ -769,16 +777,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, +return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = case Life of - auto -> - Num = length(MsgNumMsgs), - Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, Num)}; - once -> - Con0#consumer{checked_out = Checked} - end, + Con = Con0#consumer{checked_out = Checked, + credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> @@ -900,12 +902,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = lqueue:in(MsgNum, Returns)}). -return_all(State, Checked) -> - maps:fold(fun (_, '$prefix_msg', S) -> - return_one(0, '$prefix_msg', S); - (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, State, Checked). +return_all(State, Checked0) -> + %% need to sort the list so that we return messages in the order + %% they were checked out + Checked = lists:sort(maps:to_list(Checked0)), + lists:foldl(fun ({_, '$prefix_msg'}, S) -> + return_one(0, '$prefix_msg', S); + ({_, {MsgNum, Msg}}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -1170,9 +1175,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. --spec make_update_state(config()) -> protocol(). -make_update_state(Config) -> - #update_state{config = Config}. +-spec make_update_config(config()) -> protocol(). +make_update_config(Config) -> + #update_config{config = Config}. add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), @@ -1502,11 +1507,14 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> Node = node(Pid), {State0, Effects0} = enq(1, 1, second, test_init(test)), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), - {State1, Effects1} = check(Cid, 2, State0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), + %% validate consumer has credit {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), @@ -1865,6 +1873,26 @@ purge_with_checkout_test() -> ?assertEqual(0, maps:size(Checked)), ok. +down_returns_checked_out_in_order_test() -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#state.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2), + Returns = lqueue:to_list(S#state.returns), + ?assertEqual(100, length(Returns)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + meta(Idx) -> #{index => Idx, term => 1}. @@ -1900,7 +1928,7 @@ check_auto(Cid, Idx, State) -> check(Cid, Idx, Num, State) -> strip_reply( apply(meta(Idx), - make_checkout(Cid, {once, Num, simple_prefetch}, #{}), + make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), [], State)). settle(Cid, Idx, MsgId, State) -> diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 9cdb1dfbe7..955c0e4d9d 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -52,10 +52,12 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. +-type cluster_name() :: rabbit_types:r(queue). + -record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). --record(state, {cluster_name :: ra_cluster_name(), +-record(state, {cluster_name :: cluster_name(), servers = [] :: [ra_server_id()], leader :: maybe(ra_server_id()), next_seq = 0 :: seq(), @@ -88,7 +90,7 @@ %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. --spec init(ra_cluster_name(), [ra_server_id()]) -> state(). +-spec init(cluster_name(), [ra_server_id()]) -> state(). init(ClusterName, Servers) -> init(ClusterName, Servers, ?SOFT_LIMIT). @@ -98,7 +100,7 @@ init(ClusterName, Servers) -> %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). +-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, @@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) -> soft_limit = SoftLimit, timeout = Timeout}. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), +-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -397,12 +399,12 @@ purge(Node) -> end. %% @doc returns the cluster name --spec cluster_name(state()) -> ra_cluster_name(). +-spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. update_machine_state(Node, Conf) -> - case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of + case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> @@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, CDels0)}}; #consumer{last_msg_id = Prev} = C when FstId > Prev+1 -> + NumMissing = FstId - Prev + 1, + %% there may actually be fewer missing messages returned than expected + %% This can happen when a node the channel is on gets disconnected + %% from the node the leader is on and then reconnected afterwards. + %% When the node is disconnected the leader will return all checked + %% out messages to the main queue to ensure they don't get stuck in + %% case the node never comes back. Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, - length(IdMsgs) + length(Missing), + length(IdMsgs) + NumMissing, C, CDels0)}}; #consumer{last_msg_id = Prev} when FstId =< Prev -> @@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command, ok = ra:pipeline_command(Node, Command, Seq), State#state{pending = Pending#{Seq => {Correlation, Command}}}. -add_command(_Cid, _Tag, [], Acc) -> +add_command(_, _, [], Acc) -> Acc; -add_command(Cid, Tag, MsgIds, Acc) -> - [{Tag, MsgIds, Cid} | Acc]. +add_command(Cid, settle, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, return, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, discard, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl index 347f7f205e..f567561f31 100644 --- a/src/rabbit_quorum_memory_manager.erl +++ b/src/rabbit_quorum_memory_manager.erl @@ -15,7 +15,7 @@ %% -module(rabbit_quorum_memory_manager). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 2394822763..53dd2e2a7d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -54,10 +54,6 @@ {'ok', rabbit_fifo_client:state()}. -spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. --spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(), - rabbit_fifo_client:state()) -> - {'ok', 'empty', rabbit_fifo_client:state()} | - {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. @@ -82,6 +78,8 @@ open_files ]). +-define(TICK_TIME, 1000). %% the ra server tick time + %%---------------------------------------------------------------------------- -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> @@ -147,9 +145,11 @@ declare(#amqqueue{name = QName, ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -ra_machine_config(Q = #amqqueue{name = QName}) -> - #{dead_letter_handler => dlx_mfa(Q), +ra_machine_config(Q = #amqqueue{name = QName, + pid = {Name, _}}) -> + #{name => Name, queue_resource => QName, + dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> @@ -185,7 +185,8 @@ become_leader(QName, Name) -> end), case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{quorum_nodes = Nodes}} -> - [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName]) + [rpc:call(Node, ?MODULE, rpc_delete_metrics, + [QName], ?TICK_TIME) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -198,22 +199,29 @@ rpc_delete_metrics(QName) -> ok. update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> - R = reductions(Name), - rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), - Util = case C of - 0 -> 0; - _ -> rabbit_fifo:usage(Name) - end, - Infos = [{consumers, C}, {consumer_utilisation, Util}, - {message_bytes_ready, MsgBytesReady}, - {message_bytes_unacknowledged, MsgBytesUnack}, - {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)], - rabbit_core_metrics:queue_stats(QName, Infos), - rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, - {messages, M}, - {messages_ready, MR}, - {messages_unacknowledged, MU}, - {reductions, R}]). + %% this makes calls to remote processes so cannot be run inside the + %% ra server + _ = spawn(fun() -> + R = reductions(Name), + rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), + Util = case C of + 0 -> 0; + _ -> rabbit_fifo:usage(Name) + end, + Infos = [{consumers, C}, {consumer_utilisation, Util}, + {message_bytes_ready, MsgBytesReady}, + {message_bytes_unacknowledged, MsgBytesUnack}, + {message_bytes, MsgBytesReady + MsgBytesUnack} + | infos(QName)], + rabbit_core_metrics:queue_stats(QName, Infos), + rabbit_event:notify(queue_stats, + Infos ++ [{name, QName}, + {messages, M}, + {messages_ready, MR}, + {messages_unacknowledged, MU}, + {reductions, R}]) + end), + ok. reductions(Name) -> try @@ -268,7 +276,7 @@ stop(VHost) -> _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], ok. --spec delete(rabbit_types:amqqueue(), +-spec delete(#amqqueue{}, boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. @@ -286,7 +294,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, {'DOWN', MRef, process, _, _} -> ok end, - rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]), + rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?TICK_TIME), {ok, Msgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -322,6 +331,10 @@ reject(false, CTag, MsgIds, QState) -> credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). +-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(), + rabbit_fifo_client:state()) -> + {'ok', 'empty', rabbit_fifo_client:state()} | + {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, CTag0, QState0) -> CTag = quorum_ctag(CTag0), @@ -657,7 +670,7 @@ i(memory, #amqqueue{pid = {Name, _}}) -> end; i(state, #amqqueue{pid = {Name, Node}}) -> %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name]) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of {badrpc, _} -> down; State -> State end; @@ -706,7 +719,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). quorum_messages(QName) -> case ets:lookup(queue_coarse_metrics, QName) of |
