diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-21 14:06:58 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-21 14:06:58 +0100 |
| commit | f841d7a34810a89055096c2ec6ddeb73fbf30cf8 (patch) | |
| tree | f87ad0ec3d52afa37200c66184c11c749c23a52c /src | |
| parent | 1f052db8900e388535249603c28e44ee3e40a607 (diff) | |
| parent | 7e347885de96e95ce85fab577861b98aa0fa7dbf (diff) | |
| download | rabbitmq-server-git-f841d7a34810a89055096c2ec6ddeb73fbf30cf8.tar.gz | |
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
Conflicts:
src/rabbit_quorum_queue.erl
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 474 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 33 | ||||
| -rw-r--r-- | src/rabbit_quorum_memory_manager.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 71 |
6 files changed, 342 insertions, 264 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index aa9889aef6..704ead75a7 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -584,7 +584,17 @@ retry_wait(Q = #amqqueue{pid = QPid, name = Name, state = QState}, F, E, Retries {stopped, false} -> E({absent, Q, stopped}); _ -> - false = rabbit_mnesia:is_process_alive(QPid), + case rabbit_mnesia:is_process_alive(QPid) of + true -> + % rabbitmq-server#1682 + % The old check would have crashed here, + % instead, log it and run the exit fun. absent & alive is weird, + % but better than crashing with badmatch,true + rabbit_log:debug("Unexpected alive queue process ~p~n", [QPid]), + E({absent, Q, alive}); + false -> + ok % Expected result + end, timer:sleep(30), with(Name, F, E, RetriesLeft - 1) end. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f749d9f30e..d1f3b06528 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, consumer_mapping = ConsumerMapping} = State0) -> case QueueStates of #{Name := QState0} -> + QName = rabbit_quorum_queue:queue_name(QState0), case rabbit_quorum_queue:handle_event(Evt, QState0) of {{delivery, CTag, Msgs}, QState1} -> AckRequired = case maps:find(CTag, ConsumerMapping) of @@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, true -> QState1 end, - QName = rabbit_quorum_queue:queue_name(QState2), State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), @@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, %% TODO: this should use dtree:take/3 {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(Name, QNames) of - {ok, QName} -> erase_queue_stats(QName); - error -> ok - end, + erase_queue_stats(QName), noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), queue_names = maps:remove(Name, QNames)}) @@ -2530,9 +2527,10 @@ maybe_monitor_all([], S) -> S; %% optimisation maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). -add_delivery_count_header(MsgHeader, Msg) -> - Count = maps:get(delivery_count, MsgHeader, 0), - rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). +add_delivery_count_header(#{delivery_count := Count}, Msg) -> + rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg); +add_delivery_count_header(_, Msg) -> + Msg. qpid_to_ref(Pid) when is_pid(Pid) -> Pid; qpid_to_ref({Name, _}) -> Name; diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 73883aefde..706094037f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -20,13 +20,14 @@ -compile(inline_list_funcs). -compile(inline). +-compile({no_auto_import, [apply/3]}). -include_lib("ra/include/ra.hrl"). -include_lib("rabbit_common/include/rabbit.hrl"). -export([ init/1, - apply/4, + apply/3, state_enter/2, tick/2, overview/1, @@ -56,7 +57,7 @@ make_discard/2, make_credit/4, make_purge/0, - make_update_state/1 + make_update_config/1 ]). -type raw_msg() :: term(). @@ -131,7 +132,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). --record(update_state, {config :: config()}). +-record(update_config, {config :: config()}). @@ -143,7 +144,7 @@ #discard{} | #credit{} | #purge{} | - #update_state{}. + #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -265,10 +266,10 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_state(Conf, #state{name = Name, + update_config(Conf, #state{name = Name, queue_resource = Resource}). -update_state(Conf, State) -> +update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), @@ -284,55 +285,58 @@ zero(_) -> % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue -spec apply(ra_machine:command_meta_data(), command(), - ra_machine:effects(), state()) -> - {state(), ra_machine:effects(), Reply :: term()}. + state()) -> + {state(), Reply :: term(), ra_machine:effects()}. apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, - msg = RawMsg}, Effects0, State00) -> - case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of + msg = RawMsg}, State00) -> + case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State00) of {ok, State0, Effects1} -> - {State, Effects, ok} = checkout(add_bytes_enqueue(RawMsg, State0), - Effects1), - {append_to_master_index(RaftIdx, State), Effects, ok}; + %% need to checkout before capturing the shadow copy else + %% snapshots may not be complete + {State, ok, Effects} = checkout( + add_bytes_enqueue(RawMsg, State0), + Effects1), + append_to_master_index(RaftIdx, Effects, State); {duplicate, State, Effects} -> - {State, Effects, ok} + {State, ok, lists:reverse(Effects)} end; apply(#{index := RaftIdx}, - #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, + #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot % states taken need to includ them complete_and_checkout(RaftIdx, MsgIds, ConsumerId, - Con0, Effects0, State); + Con0, [], State); _ -> - {State, Effects0, ok} + {State, ok} + end; apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, - Effects0, #state{consumers = Cons0} = State0) -> + #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> - {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds, - ConsumerId, Con0, - Effects0, State0), Discarded = maps:with(MsgIds, Con0#consumer.checked_out), - {State, dead_letter_effects(Discarded, State, Effects), Res}; + Effects = dead_letter_effects(Discarded, State0, []), + complete_and_checkout(RaftIdx, MsgIds, ConsumerId, Con0, + Effects, State0); _ -> - {State0, Effects0, ok} + {State0, ok} end; -apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, +apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), MsgNumMsgs = maps:values(Returned), - return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State); + return(ConsumerId, MsgNumMsgs, Con0, Checked, [], State); _ -> - {State, Effects0, ok} + {State, ok} end; apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, - drain = Drain, consumer_id = ConsumerId}, Effects0, + drain = Drain, consumer_id = ConsumerId}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -344,16 +348,16 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, ServiceQueue = maybe_queue_consumer(ConsumerId, Con1, ServiceQueue0), Cons = maps:put(ConsumerId, Con1, Cons0), - {State1, Effects, ok} = + {State1, ok, Effects} = checkout(State0#state{service_queue = ServiceQueue, - consumers = Cons}, Effects0), + consumers = Cons}, []), Response = {send_credit_reply, maps:size(State1#state.messages)}, %% by this point all checkouts for the updated credit value %% should be processed so we can evaluate the drain case Drain of false -> %% just return the result of the checkout - {State1, Effects, Response}; + {State1, Response, Effects}; true -> Con = #consumer{credit = PostCred} = maps:get(ConsumerId, State1#state.consumers), @@ -366,108 +370,116 @@ apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, Drained = Con#consumer.credit, {CTag, _} = ConsumerId, {State1#state{consumers = Consumers}, - Effects, %% returning a multi response with two client actions %% for the channel to execute - {multi, [Response, {send_drained, [{CTag, Drained}]}]}} + {multi, [Response, {send_drained, [{CTag, Drained}]}]}, + Effects} end; _ -> %% credit for unknown consumer - just ignore - {State0, Effects0, ok} + {State0, ok} end; -apply(_, #checkout{spec = {dequeue, _}}, Effects0, +apply(_, #checkout{spec = {dequeue, _}}, #state{messages = M, prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? - {State0, Effects0, {dequeue, empty}}; + {State0, {dequeue, empty}}; apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, consumer_id = ConsumerId}, - Effects0, State0) -> + State0) -> % TODO: this clause could probably be optimised State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), % turn send msg effect into reply {success, _, MsgId, Msg, State2} = checkout_one(State1), % immediately settle - {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]), - Effects0, State2), - {State, Effects, {dequeue, {MsgId, Msg}}}; + {State, _, Effects} = apply(Meta, make_settle(ConsumerId, [MsgId]), State2), + {State, {dequeue, {MsgId, Msg}}, Effects}; apply(_, #checkout{spec = {dequeue, unsettled}, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, - Effects0, State0) -> + State0) -> State1 = update_consumer(ConsumerId, ConsumerMeta, {once, 1, simple_prefetch}, State0), - Effects1 = [{monitor, process, Pid} | Effects0], - {State, Reply, Effects} = case checkout_one(State1) of - {success, _, MsgId, Msg, S} -> - {S, {MsgId, Msg}, Effects1}; - {inactive, S} -> - {S, empty, [{aux, inactive} | Effects1]}; - S -> - {S, empty, Effects1} - end, - {State, Effects, {dequeue, Reply}}; -apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) -> - {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}), + case checkout_one(State1) of + {success, _, MsgId, Msg, S} -> + {S, {dequeue, {MsgId, Msg}}, [{monitor, process, Pid}]}; + {inactive, S} -> + {S, {dequeue, empty}, [{aux, inactive}]}; + S -> + {S, {dequeue, empty}} + end; +apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> + {CancelEffects, State1} = cancel_consumer(ConsumerId, {[], State0}), % TODO: here we should really demonitor the pid but _only_ if it has no % other consumers or enqueuers. checkout(State1, CancelEffects); -apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId}, - Effects0, State0) -> +apply(_, #checkout{spec = Spec, meta = Meta, + consumer_id = {_, Pid} = ConsumerId}, + State0) -> State1 = update_consumer(ConsumerId, Meta, Spec, State0), - {State, Effects, Res} = checkout(State1, Effects0), - {State, [{monitor, process, Pid} | Effects], Res}; -apply(#{index := RaftIdx}, #purge{}, Effects0, + checkout(State1, [{monitor, process, Pid}]); +apply(#{index := RaftIdx}, #purge{}, #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> Total = rabbit_fifo_index:size(Indexes), - {State1, Effects1, _} = + {State1, Effects1} = maps:fold( fun(ConsumerId, C = #consumer{checked_out = Checked0}, - {StateAcc0, EffectsAcc0, ok}) -> + {StateAcc0, EffectsAcc0}) -> MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} <- maps:values(Checked0)], complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, #{}, EffectsAcc0, StateAcc0) - end, {State0, Effects0, ok}, Cons0), - {State, Effects, _} = + end, {State0, []}, Cons0), + {State, _, Effects} = update_smallest_raft_index( RaftIdx, Indexes, State1#state{ra_indexes = rabbit_fifo_index:empty(), messages = #{}, returns = lqueue:new(), + msg_bytes_enqueue = 0, + msg_bytes_checkout = 0, low_msg_num = undefined}, Effects1), - {State, [garbage_collection | Effects], {purge, Total}}; + %% as we're not checking out after a purge (no point) we have to + %% reverse the effects ourselves + {State, {purge, Total}, + lists:reverse([garbage_collection | Effects])}; apply(_, {down, ConsumerPid, noconnection}, - Effects0, #state{consumers = Cons0, + #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> Node = node(ConsumerPid), - % mark all consumers and enqueuers as suspect - % and monitor the node - {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, - {Co, St0}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - {maps:put(K, C#consumer{suspected_down = true, - checked_out = #{}}, - Co), - St}; - (K, C, {Co, St}) -> - {maps:put(K, C, Co), St} - end, {#{}, State0}, Cons0), + % mark all consumers and enqueuers as suspected down + % and monitor the node so that we can find out the final state of the + % process at some later point + {Cons, State} = maps:fold( + fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + %% TODO: need to increment credit here + %% with the size of the Checked map + Credit = increase_credit(C, maps:size(Checked0)), + {maps:put(K, C#consumer{suspected_down = true, + credit = Credit, + checked_out = #{}}, Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E end, Enqs0), Effects = case maps:size(Cons) of 0 -> - [{aux, inactive}, {monitor, node, Node} | Effects0]; + [{aux, inactive}, {monitor, node, Node}]; _ -> - [{monitor, node, Node} | Effects0] + [{monitor, node, Node}] end, - {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; -apply(_, {down, Pid, _Info}, Effects0, - #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + %% TODO: should we run a checkout here? + {State#state{consumers = Cons, enqueuers = Enqs}, ok, Effects}; +apply(_, {down, Pid, _Info}, #state{consumers = Cons0, + enqueuers = Enqs0} = State0) -> % Remove any enqueuer for the same pid and enqueue any pending messages % This should be ok as we won't see any more enqueues from this pid State1 = case maps:take(Pid, Enqs0) of @@ -482,13 +494,12 @@ apply(_, {down, Pid, _Info}, Effects0, % Find the consumers for the down pid DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {Effects0, State1}, + {Effects1, State2} = lists:foldl(fun cancel_consumer/2, {[], State1}, DownConsumers), checkout(State2, Effects1); -apply(_, {nodeup, Node}, Effects0, - #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> +apply(_, {nodeup, Node}, #state{consumers = Cons0, + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -516,14 +527,14 @@ apply(_, {nodeup, Node}, Effects0, CAcc, SQAcc, EAcc); (_, _, Acc) -> Acc - end, {Cons0, SQ0, Effects0}, Cons0), + end, {Cons0, SQ0, Monitors}, Cons0), % TODO: avoid list concat checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, - service_queue = SQ}, Monitors ++ Effects); -apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}; -apply(_, #update_state{config = Conf}, Effects, State) -> - {update_state(Conf, State), Effects, ok}. + service_queue = SQ}, Effects); +apply(_, {nodedown, _Node}, State) -> + {State, ok}; +apply(_, #update_config{config = Conf}, State) -> + {update_config(Conf, State), ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -594,7 +605,9 @@ overview(#state{consumers = Cons, get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> case Consumers of #{Cid := #consumer{checked_out = Checked}} -> - [{K, snd(snd(maps:get(K, Checked)))} || K <- lists:seq(From, To)]; + [{K, snd(snd(maps:get(K, Checked)))} + || K <- lists:seq(From, To), + maps:is_key(K, Checked)]; _ -> [] end. @@ -756,14 +769,6 @@ cancel_consumer0(ConsumerId, {Effects0, S0} end. -incr_enqueue_count(#state{enqueue_count = C, - shadow_copy_interval = C} = State0) -> - % time to stash a dehydrated state version - State = State0#state{enqueue_count = 0}, - {State, dehydrate_state(State)}; -incr_enqueue_count(#state{enqueue_count = C} = State) -> - {State#state{enqueue_count = C + 1}, undefined}. - enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = LowMsgNum, next_msg_num = NextMsgNum} = State0) -> @@ -774,11 +779,20 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages, low_msg_num = min(LowMsgNum, NextMsgNum), next_msg_num = NextMsgNum + 1}. -append_to_master_index(RaftIdx, +append_to_master_index(RaftIdx, Effects, #state{ra_indexes = Indexes0} = State0) -> {State, Shadow} = incr_enqueue_count(State0), Indexes = rabbit_fifo_index:append(RaftIdx, Shadow, Indexes0), - State#state{ra_indexes = Indexes}. + {State#state{ra_indexes = Indexes}, ok, Effects}. + +incr_enqueue_count(#state{enqueue_count = C, + shadow_copy_interval = C} = State0) -> + % time to stash a dehydrated state version + State = State0#state{enqueue_count = 0}, + {State, dehydrate_state(State)}; +incr_enqueue_count(#state{enqueue_count = C} = State) -> + {State#state{enqueue_count = C + 1}, undefined}. + enqueue_pending(From, #enqueuer{next_seqno = Next, @@ -824,16 +838,10 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, +return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = case Life of - auto -> - Num = length(MsgNumMsgs), - Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, Num)}; - once -> - Con0#consumer{checked_out = Checked} - end, + Con = Con0#consumer{checked_out = Checked, + credit = increase_credit(Con0, length(MsgNumMsgs))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> @@ -859,7 +867,7 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), {State0#state{consumers = Cons, ra_indexes = Indexes, - service_queue = SQ}, Effects, ok}. + service_queue = SQ}, Effects}. increase_credit(#consumer{lifetime = once, credit = Credit}, _) -> @@ -886,10 +894,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, end, State0, maps:values(Discarded)), %% need to pass the length of discarded as $prefix_msgs would be filtered %% by the above list comprehension - {State2, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, - maps:size(Discarded), - Con0, Checked, Effects0, State1), - {State, Effects, _} = checkout(State2, Effects1), + {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, + maps:size(Discarded), + Con0, Checked, Effects0, State1), + {State, ok, Effects} = checkout(State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). @@ -918,7 +926,7 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, % there are no messages on queue anymore and no pending enqueues % we can forward release_cursor all the way until % the last received command - {State, [{release_cursor, IncomingRaftIdx, State} | Effects], ok}; + {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]}; _ -> NewSmallest = rabbit_fifo_index:smallest(Indexes), % Take the smallest raft index available in the index when starting @@ -927,15 +935,15 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, {{Smallest, _}, {Smallest, _}} -> % smallest has not changed, do not issue release cursor % effects - {State, Effects, ok}; + {State, ok, Effects}; {_, {Smallest, Shadow}} when Shadow =/= undefined -> % ?INFO("RELEASE ~w ~w ~w~n", [IncomingRaftIdx, Smallest, % Shadow]), - {State, [{release_cursor, Smallest, Shadow} | Effects], ok}; + {State, ok, [{release_cursor, Smallest, Shadow}]}; _ -> % smallest % no shadow taken for this index, % no release cursor increase - {State, Effects, ok} + {State, ok, Effects} end end. @@ -955,13 +963,18 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = lqueue:in(MsgNum, Returns)}). -return_all(State, Checked) -> - maps:fold(fun (_, '$prefix_msg', S) -> - return_one(0, '$prefix_msg', S); - (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, State, Checked). - +return_all(State, Checked0) -> + %% need to sort the list so that we return messages in the order + %% they were checked out + Checked = lists:sort(maps:to_list(Checked0)), + lists:foldl(fun ({_, '$prefix_msg'}, S) -> + return_one(0, '$prefix_msg', S); + ({_, {MsgNum, Msg}}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). + +%% checkout new messages to consumers +%% reverses the effects list checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -973,10 +986,10 @@ checkout0({success, ConsumerId, MsgId, Msg, State}, Effects, Acc0) -> checkout0(checkout_one(State), Effects, Acc); checkout0({inactive, State}, Effects0, Acc) -> Effects = append_send_msg_effects(Effects0, Acc), - {State, [{aux, inactive} | Effects], ok}; + {State, ok, lists:reverse([{aux, inactive} | Effects])}; checkout0(State, Effects0, Acc) -> Effects = append_send_msg_effects(Effects0, Acc), - {State, Effects, ok}. + {State, ok, lists:reverse(Effects)}. append_send_msg_effects(Effects, AccMap) when map_size(AccMap) == 0 -> Effects; @@ -1244,9 +1257,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. --spec make_update_state(config()) -> protocol(). -make_update_state(Config) -> - #update_state{config = Config}. +-spec make_update_config(config()) -> protocol(). +make_update_config(Config) -> + #update_config{config = Config}. add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), @@ -1308,10 +1321,10 @@ enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), - {_State3, Effects, _} = + {_State3, _, Effects} = apply(meta(3), make_checkout(Cid, {once, 2, simple_prefetch}, #{}), - [], State2), + State2), ?ASSERT_EFF({monitor, _, _}, Effects), ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), ok. @@ -1320,9 +1333,8 @@ credit_enq_enq_checkout_settled_credit_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), - {State3, Effects, _} = - apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), - [], State2), + {State3, _, Effects} = + apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), State2), ?ASSERT_EFF({monitor, _, _}, Effects), Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; (_) -> false @@ -1353,12 +1365,12 @@ credit_with_drained_test() -> %% checkout with a single credit {State1, _, _} = apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}), - [], State0), + State0), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, delivery_count = 0}}}, State1), - {State, _Effs, Result} = - apply(meta(3), make_credit(Cid, 0, 5, true), [], State1), + {State, Result, _} = + apply(meta(3), make_credit(Cid, 0, 5, true), State1), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), @@ -1372,14 +1384,14 @@ credit_and_drain_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), %% checkout without any initial credit (like AMQP 1.0 would) - {State3, CheckEffs, _} = + {State3, _, CheckEffs} = apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}), - [], State2), + State2), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), - {State4, Effects, {multi, [{send_credit_reply, 0}, - {send_drained, [{?FUNCTION_NAME, 2}]}]}} = - apply(meta(4), make_credit(Cid, 4, 0, true), [], State3), + {State4, {multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 2}]}]}, + Effects} = apply(meta(4), make_credit(Cid, 4, 0, true), State3), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), @@ -1397,9 +1409,9 @@ enq_enq_deq_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = + {_State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State2), + State2), ok. enq_enq_deq_deq_settle_test() -> @@ -1407,39 +1419,38 @@ enq_enq_deq_deq_settle_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), % get returns a reply value - {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = + {State3, {dequeue, {0, {_, first}}}, [{monitor, _, _}]} = apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State2), - {_State4, _Effects4, {dequeue, empty}} = + State2), + {_State4, {dequeue, empty}, _} = apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State3), + State3), ok. enq_enq_checkout_get_settled_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), % get returns a reply value - {_State2, _Effects, {dequeue, {0, {_, first}}}} = + {_State2, {dequeue, {0, {_, first}}}, _Effs} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + State1), ok. checkout_get_empty_test() -> Cid = {?FUNCTION_NAME, self()}, State = test_init(test), - {_State2, [], {dequeue, empty}} = - apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), - [], State), + {_State2, {dequeue, empty}} = + apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), State), ok. untracked_enq_deq_test() -> Cid = {?FUNCTION_NAME, self()}, State0 = test_init(test), {State1, _, _} = apply(meta(1), - make_enqueue(undefined, undefined, first), [], State0), - {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + make_enqueue(undefined, undefined, first), + State0), + {_State2, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. release_cursor_test() -> @@ -1512,18 +1523,18 @@ return_non_existent_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), % return non-existent - {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0), + {_State2, _} = apply(meta(3), make_return(Cid, [99]), State0), ok. return_checked_out_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, [_Monitor, {aux, active}, - {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} = - check(Cid, 2, State0), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, + {aux, active} + ]} = check(Cid, 2, State0), % return - {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]), - [], State1), + {_State2, _, [_, _]} = apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1532,11 +1543,13 @@ return_auto_checked_out_test() -> {State0, [_]} = enq(2, 2, second, State00), % it first active then inactive as the consumer took on but cannot take % any more - {State1, [_Monitor, {aux, inactive}, {aux, active}, - {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} = - check_auto(Cid, 2, State0), + {State1, [_Monitor, + {send_msg, _, {delivery, _, [{MsgId, _}]}, _}, + {aux, active}, + {aux, inactive} + ]} = check_auto(Cid, 2, State0), % return should include another delivery - {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1), + {_State2, _, Effects} = apply(meta(3), make_return(Cid, [MsgId]), State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, Effects), @@ -1549,22 +1562,21 @@ cancelled_checkout_out_test() -> {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue - {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), - [], State1), + {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), ?assertEqual(2, maps:size(State2#state.messages)), - {State3, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2), + {State3, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), ?debugFmt("State3 ~p", [State3]), - {_State, _, {dequeue, {_, {_, second}}}} = - apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3), + {_State, {dequeue, {_, {_, second}}}, _} = + apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), ok. down_with_noproc_consumer_returns_unsettled_test() -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), - {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), ok. @@ -1576,17 +1588,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> Node = node(Pid), {State0, Effects0} = enq(1, 1, second, test_init(test)), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects0), - {State1, Effects1} = check(Cid, 2, State0), + {State1, Effects1} = check_auto(Cid, 2, State0), + #consumer{credit = 0} = maps:get(Cid, State1#state.consumers), ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects1), % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node - {State2a, _Effects2a, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), - {State2, Effects2, _} = apply(meta(3), {down, Self, noconnection}, [], State2a), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), + #consumer{credit = 1} = maps:get(Cid, State2a#state.consumers), + %% validate consumer has credit + {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), % when the node comes up we need to retry the process monitors for the % disconnected processes - {_State3, Effects3, _} = apply(meta(3), {nodeup, Node}, [], State2), + {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), % try to re-monitor the suspect processes ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), @@ -1601,7 +1616,7 @@ down_with_noconnection_returns_unack_test() -> {State1, {_, _}} = deq(2, Cid, unsettled, State0), ?assertEqual(0, maps:size(State1#state.messages)), ?assertEqual(0, lqueue:len(State1#state.returns)), - {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), ?assertEqual(1, maps:size(State2a#state.messages)), ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. @@ -1609,9 +1624,9 @@ down_with_noconnection_returns_unack_test() -> down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), - {State0, Effects0, _} = apply(meta(1), {enqueue, Pid, 1, first}, [], State00), + {State0, _, Effects0} = apply(meta(1), {enqueue, Pid, 1, first}, State00), ?ASSERT_EFF({monitor, process, _}, Effects0), - {State1, _Effects1, _} = apply(meta(3), {down, Pid, noproc}, [], State0), + {State1, _, _} = apply(meta(3), {down, Pid, noproc}, State0), % ensure there are no enqueuers ?assert(0 =:= maps:size(State1#state.enqueuers)), ok. @@ -1633,7 +1648,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1), + {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -1650,8 +1665,7 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), - [], State1), + {_State2, _, Effects2} = apply(meta(1), make_discard(Cid, [0]), State1), % assert mod call effect with appended reason and message ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, Effects2), @@ -1664,7 +1678,7 @@ tick_test() -> {S1, _} = enq(2, 2, <<"snd">>, S0), {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), {S3, {_, _}} = deq(4, Cid2, unsettled, S2), - {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3), + {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), S3), [{mod_call, _, _, [#resource{}, @@ -1840,8 +1854,8 @@ run_snapshot_test0(Name, Commands) -> ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", - % [Name, SnapIdx, S, State, SnapState, Filtered]), + ?debugFmt("Name ~p~nS~p~nState~p~nn", + [Name, S, State]), ?assertEqual(State, S) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -1874,10 +1888,9 @@ pending_enqueue_is_enqueued_on_down_test() -> Cid = {<<"cid">>, self()}, Pid = self(), {State0, _} = enq(1, 2, first, test_init(test)), - {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0), - {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), - [], State1), + {State1, _, _} = apply(meta(2), {down, Pid, noproc}, State0), + {_State2, {dequeue, {0, {_, first}}}, _} = + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State1), ok. duplicate_delivery_test() -> @@ -1922,76 +1935,105 @@ state_enter_montors_and_notifications_test() -> purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1), + {State2, {purge, 1}, _} = apply(meta(2), make_purge(), State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value - {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} = - apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3), + {_State4, {dequeue, {0, {_, second}}}, [{monitor, _, _}]} = + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), State3), ok. purge_with_checkout_test() -> Cid = {<<"purge_test">>, self()}, {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), - {State1, _} = enq(2, 1, first, State0), - {State2, _} = enq(3, 2, second, State1), - {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2), + {State1, _} = enq(2, 1, <<"first">>, State0), + {State2, _} = enq(3, 2, <<"second">>, State1), + %% assert message bytes are non zero + ?assert(State2#state.msg_bytes_checkout > 0), + ?assert(State2#state.msg_bytes_enqueue > 0), + {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2), + ?assertEqual(0, State3#state.msg_bytes_checkout), + ?assertEqual(0, State3#state.msg_bytes_enqueue), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), ?assertEqual(0, maps:size(Checked)), ok. +down_returns_checked_out_in_order_test() -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#state.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), + Returns = lqueue:to_list(S#state.returns), + ?assertEqual(100, length(Returns)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + meta(Idx) -> #{index => Idx, term => 1}. enq(Idx, MsgSeq, Msg, State) -> strip_reply( - apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)). + apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), State)). deq(Idx, Cid, Settlement, State0) -> - {State, _, {dequeue, Msg}} = + {State, {dequeue, Msg}, _} = apply(meta(Idx), - make_checkout(Cid, {dequeue, Settlement}, #{}), - [], State0), + make_checkout(Cid, {dequeue, Settlement}, #{}), + State0), {State, Msg}. check_n(Cid, Idx, N, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {auto, N, simple_prefetch}, #{}), - [], State)). + State)). check(Cid, Idx, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {once, 1, simple_prefetch}, #{}), - [], State)). + State)). check_auto(Cid, Idx, State) -> strip_reply( apply(meta(Idx), make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), - [], State)). + State)). check(Cid, Idx, Num, State) -> strip_reply( apply(meta(Idx), - make_checkout(Cid, {once, Num, simple_prefetch}, #{}), - [], State)). + make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), + State)). settle(Cid, Idx, MsgId, State) -> - strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)). + strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), State)). credit(Cid, Idx, Credit, DelCnt, Drain, State) -> strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain), - [], State)). + State)). -strip_reply({State, Effects, _Reply}) -> +strip_reply({State, _, Effects}) -> {State, Effects}. run_log(InitState, Entries) -> lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> - case apply(meta(Idx), E, Efx0, Acc0) of - {Acc, Efx, _} -> - {Acc, Efx} + case apply(meta(Idx), E, Acc0) of + {Acc, _, Efx} when is_list(Efx) -> + {Acc, Efx0 ++ Efx}; + {Acc, _, Efx} -> + {Acc, Efx0 ++ [Efx]}; + {Acc, _} -> + {Acc, Efx0} end end, {InitState, []}, Entries). diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 9cdb1dfbe7..955c0e4d9d 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -52,10 +52,12 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. +-type cluster_name() :: rabbit_types:r(queue). + -record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). --record(state, {cluster_name :: ra_cluster_name(), +-record(state, {cluster_name :: cluster_name(), servers = [] :: [ra_server_id()], leader :: maybe(ra_server_id()), next_seq = 0 :: seq(), @@ -88,7 +90,7 @@ %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. --spec init(ra_cluster_name(), [ra_server_id()]) -> state(). +-spec init(cluster_name(), [ra_server_id()]) -> state(). init(ClusterName, Servers) -> init(ClusterName, Servers, ?SOFT_LIMIT). @@ -98,7 +100,7 @@ init(ClusterName, Servers) -> %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). +-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, @@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) -> soft_limit = SoftLimit, timeout = Timeout}. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), +-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -397,12 +399,12 @@ purge(Node) -> end. %% @doc returns the cluster name --spec cluster_name(state()) -> ra_cluster_name(). +-spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. update_machine_state(Node, Conf) -> - case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of + case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> @@ -620,11 +622,18 @@ handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, CDels0)}}; #consumer{last_msg_id = Prev} = C when FstId > Prev+1 -> + NumMissing = FstId - Prev + 1, + %% there may actually be fewer missing messages returned than expected + %% This can happen when a node the channel is on gets disconnected + %% from the node the leader is on and then reconnected afterwards. + %% When the node is disconnected the leader will return all checked + %% out messages to the main queue to ensure they don't get stuck in + %% case the node never comes back. Missing = get_missing_deliveries(Leader, Prev+1, FstId-1, Tag), Del = {delivery, Tag, Missing ++ IdMsgs}, {Del, State0#state{consumer_deliveries = update_consumer(Tag, LastId, - length(IdMsgs) + length(Missing), + length(IdMsgs) + NumMissing, C, CDels0)}}; #consumer{last_msg_id = Prev} when FstId =< Prev -> @@ -714,7 +723,11 @@ resend_command(Node, Correlation, Command, ok = ra:pipeline_command(Node, Command, Seq), State#state{pending = Pending#{Seq => {Correlation, Command}}}. -add_command(_Cid, _Tag, [], Acc) -> +add_command(_, _, [], Acc) -> Acc; -add_command(Cid, Tag, MsgIds, Acc) -> - [{Tag, MsgIds, Cid} | Acc]. +add_command(Cid, settle, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, return, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]; +add_command(Cid, discard, MsgIds, Acc) -> + [rabbit_fifo:make_settle(Cid, MsgIds) | Acc]. diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl index 347f7f205e..f567561f31 100644 --- a/src/rabbit_quorum_memory_manager.erl +++ b/src/rabbit_quorum_memory_manager.erl @@ -15,7 +15,7 @@ %% -module(rabbit_quorum_memory_manager). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 8672a01ce8..ce5b4b5b6b 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -55,10 +55,6 @@ {'ok', rabbit_fifo_client:state()}. -spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. --spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(), - rabbit_fifo_client:state()) -> - {'ok', 'empty', rabbit_fifo_client:state()} | - {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. @@ -83,6 +79,8 @@ open_files ]). +-define(TICK_TIME, 1000). %% the ra server tick time + %%---------------------------------------------------------------------------- -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> @@ -148,9 +146,11 @@ declare(#amqqueue{name = QName, ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -ra_machine_config(Q = #amqqueue{name = QName}) -> - #{dead_letter_handler => dlx_mfa(Q), +ra_machine_config(Q = #amqqueue{name = QName, + pid = {Name, _}}) -> + #{name => Name, queue_resource => QName, + dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}, single_active_consumer_on => single_active_consumer_on(Q)}. @@ -165,13 +165,15 @@ cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); false -> + %% this could potentially block for a while if the node is + %% in disconnected state or tcp buffers are full rpc:cast(Node, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]) end. cancel_consumer(QName, ChPid, ConsumerTag) -> - rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), + catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName), rabbit_event:notify(consumer_deleted, [{consumer_tag, ConsumerTag}, {channel, ChPid}, @@ -193,7 +195,8 @@ become_leader(QName, Name) -> end), case rabbit_amqqueue:lookup(QName) of {ok, #amqqueue{quorum_nodes = Nodes}} -> - [rpc:call(Node, ?MODULE, rpc_delete_metrics, [QName]) + [rpc:call(Node, ?MODULE, rpc_delete_metrics, + [QName], ?TICK_TIME) || Node <- Nodes, Node =/= node()]; _ -> ok @@ -206,22 +209,29 @@ rpc_delete_metrics(QName) -> ok. update_metrics(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> - R = reductions(Name), - rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), - Util = case C of - 0 -> 0; - _ -> rabbit_fifo:usage(Name) - end, - Infos = [{consumers, C}, {consumer_utilisation, Util}, - {message_bytes_ready, MsgBytesReady}, - {message_bytes_unacknowledged, MsgBytesUnack}, - {message_bytes, MsgBytesReady + MsgBytesUnack} | infos(QName)], - rabbit_core_metrics:queue_stats(QName, Infos), - rabbit_event:notify(queue_stats, Infos ++ [{name, QName}, - {messages, M}, - {messages_ready, MR}, - {messages_unacknowledged, MU}, - {reductions, R}]). + %% this makes calls to remote processes so cannot be run inside the + %% ra server + _ = spawn(fun() -> + R = reductions(Name), + rabbit_core_metrics:queue_stats(QName, MR, MU, M, R), + Util = case C of + 0 -> 0; + _ -> rabbit_fifo:usage(Name) + end, + Infos = [{consumers, C}, {consumer_utilisation, Util}, + {message_bytes_ready, MsgBytesReady}, + {message_bytes_unacknowledged, MsgBytesUnack}, + {message_bytes, MsgBytesReady + MsgBytesUnack} + | infos(QName)], + rabbit_core_metrics:queue_stats(QName, Infos), + rabbit_event:notify(queue_stats, + Infos ++ [{name, QName}, + {messages, M}, + {messages_ready, MR}, + {messages_unacknowledged, MU}, + {reductions, R}]) + end), + ok. reductions(Name) -> try @@ -276,7 +286,7 @@ stop(VHost) -> _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], ok. --spec delete(rabbit_types:amqqueue(), +-spec delete(#amqqueue{}, boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. @@ -294,7 +304,8 @@ delete(#amqqueue{type = quorum, pid = {Name, _}, {'DOWN', MRef, process, _, _} -> ok end, - rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName]), + rpc:call(LeaderNode, rabbit_core_metrics, queue_deleted, [QName], + ?TICK_TIME), {ok, Msgs}; {error, {no_more_servers_to_try, Errs}} -> case lists:all(fun({{error, noproc}, _}) -> true; @@ -330,6 +341,10 @@ reject(false, CTag, MsgIds, QState) -> credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). +-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(), + rabbit_fifo_client:state()) -> + {'ok', 'empty', rabbit_fifo_client:state()} | + {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, CTag0, QState0) -> CTag = quorum_ctag(CTag0), @@ -665,7 +680,7 @@ i(memory, #amqqueue{pid = {Name, _}}) -> end; i(state, #amqqueue{pid = {Name, Node}}) -> %% Check against the leader or last known leader - case rpc:call(Node, ?MODULE, cluster_state, [Name]) of + case rpc:call(Node, ?MODULE, cluster_state, [Name], ?TICK_TIME) of {badrpc, _} -> down; State -> State end; @@ -714,7 +729,7 @@ format(#amqqueue{quorum_nodes = Nodes} = Q) -> [{members, Nodes}, {online, online(Q)}, {leader, leader(Q)}]. is_process_alive(Name, Node) -> - erlang:is_pid(rpc:call(Node, erlang, whereis, [Name])). + erlang:is_pid(rpc:call(Node, erlang, whereis, [Name], ?TICK_TIME)). quorum_messages(QName) -> case ets:lookup(queue_coarse_metrics, QName) of |
