diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-07 15:28:26 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-07 15:50:09 +0000 |
| commit | 4fa8cc8678519bb2a618fff670b761858a2f3807 (patch) | |
| tree | dd8b669f7c16fd0f56cf2c41dc23e37921da4caf /src | |
| parent | def0a53d821e7c99a9b59c89e9bbcf1baf23f594 (diff) | |
| download | rabbitmq-server-git-4fa8cc8678519bb2a618fff670b761858a2f3807.tar.gz | |
Add rabbit_fifo snapshot property test
And fix various subtle bugs around snapshotting.
Diffstat (limited to 'src')
| -rw-r--r-- | src/lqueue.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 134 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 52 |
3 files changed, 120 insertions, 74 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 0652061075..1abe4e0b82 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -21,7 +21,7 @@ %% is an O(1) operation, in contrast with queue:len/1 which is O(n). -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). + foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]). -define(QUEUE, queue). @@ -32,6 +32,7 @@ -type result() :: 'empty' | {'value', value()}. -spec new() -> ?MODULE(). +-spec drop(?MODULE()) -> ?MODULE(). -spec is_empty(?MODULE()) -> boolean(). -spec len(?MODULE()) -> non_neg_integer(). -spec in(value(), ?MODULE()) -> ?MODULE(). @@ -48,6 +49,8 @@ new() -> {0, ?QUEUE:new()}. +drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}. + is_empty({0, _Q}) -> true; is_empty(_) -> false. @@ -81,7 +84,8 @@ foldr(Fun, Init, Q) -> {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) end. -len({L, _Q}) -> L. +len({L, _}) -> L. + peek({ 0, _Q}) -> empty; peek({_L, Q}) -> ?QUEUE:peek(Q). diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 740c6e202c..00d0db0b8a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -163,7 +163,7 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = queue:new() :: queue:queue(msg_in_id()), + returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -196,7 +196,8 @@ %% it instead takes messages from the `messages' map. %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msg_count = 0 :: non_neg_integer() + prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), + PrefixMsgs :: non_neg_integer()} }). -opaque state() :: #state{}. @@ -279,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0, #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = [M || M <- maps:values(Returned)], + MsgNumMsgs = maps:values(Returned), return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State); _ -> {State, Effects0, ok} @@ -329,8 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, end; apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, #state{messages = M, - prefix_msg_count = 0} = State0) when map_size(M) == 0 -> - %% FIX: also check if there are returned messages + prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> + %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, @@ -382,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0, RaftIdx, Indexes, State1#state{ra_indexes = rabbit_fifo_index:empty(), messages = #{}, - returns = queue:new(), + returns = lqueue:new(), low_msg_num = undefined}, Effects1), {State, [garbage_collection | Effects], {purge, Total}}; apply(_, {down, ConsumerPid, noconnection}, @@ -476,7 +477,7 @@ apply(_, {update_state, Conf}, Effects, State) -> state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, name = Name, - prefix_msg_count = 0, + prefix_msg_counts = {0, 0}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), @@ -489,10 +490,10 @@ state_enter(leader, #state{consumers = Cons, {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; -state_enter(recovered, #state{prefix_msg_count = PrefixMsgCount}) - when PrefixMsgCount =/= 0 -> +state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) + when PrefixMsgCounts =/= {0, 0} -> %% TODO: remove assertion? - exit({rabbit_fifo, unexpected_prefix_msg_count, PrefixMsgCount}); + exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; @@ -722,9 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg', - #state{prefix_msg_count = MsgCount} = S0) -> - S0#state{prefix_msg_count = MsgCount + 1}; + State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> + return_one(0, Msg, S0); ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) end, State0, MsgNumMsgs), @@ -826,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, end. % TODO update message then update messages and returns in single operations +return_one(0, '$prefix_msg', + #state{returns = Returns} = State0) -> + State0#state{returns = lqueue:in('$prefix_msg', Returns)}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{messages = Messages, returns = Returns} = State0) -> @@ -835,12 +838,11 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = queue:in(MsgNum, Returns)}. + returns = lqueue:in(MsgNum, Returns)}. return_all(State, Checked) -> - maps:fold(fun (_, '$prefix_msg', - #state{prefix_msg_count = MsgCount} = S) -> - S#state{prefix_msg_count = MsgCount + 1}; + maps:fold(fun (_, '$prefix_msg', S) -> + return_one(0, '$prefix_msg', S); (_, {MsgNum, Msg}, S) -> return_one(MsgNum, Msg, S) end, State, Checked). @@ -869,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. +next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State) + when PReturns > 0 -> + %% there are prefix returns, these should be served first + {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}}; next_checkout_message(#state{returns = Returns, low_msg_num = Low0, + prefix_msg_counts = {R, P}, next_msg_num = NextMsgNum} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue - case queue:peek(Returns) of - empty -> + case lqueue:peek(Returns) of + {value, Next} -> + {Next, State#state{returns = lqueue:drop(Returns)}}; + empty when P == 0 -> case Low0 of undefined -> {undefined, State}; @@ -888,25 +897,32 @@ next_checkout_message(#state{returns = Returns, {Low0, State#state{low_msg_num = Low}} end end; - {value, Next} -> - {Next, State#state{returns = queue:drop(Returns)}} + empty -> + %% There are prefix msgs + {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}} end. -take_next_msg(#state{prefix_msg_count = 0, - messages = Messages0} = State0) -> - {NextMsgInId, State} = next_checkout_message(State0), - %% messages are available - case maps:take(NextMsgInId, Messages0) of - {IdxMsg, Messages} -> - {{NextMsgInId, IdxMsg}, State, Messages, 0}; - error -> - error - end; -take_next_msg(#state{prefix_msg_count = MsgCount, - messages = Messages} = State) -> - %% there is still a prefix message count for the consumer - %% "fake" a '$prefix_msg' message - {'$prefix_msg', State, Messages, MsgCount - 1}. +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#state{messages = Messages0} = State0) -> + case next_checkout_message(State0) of + {'$prefix_msg', State} -> + {'$prefix_msg', State, Messages0}; + {NextMsgInId, State} -> + %% messages are available + case maps:take(NextMsgInId, Messages0) of + {IdxMsg, Messages} -> + {{NextMsgInId, IdxMsg}, State, Messages}; + error -> + error + end + end. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. @@ -917,7 +933,7 @@ checkout_one(#state{service_queue = SQ0, case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of - {ConsumerMsg, State0, Messages, PrefMsgC} -> + {ConsumerMsg, State0, Messages} -> SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout @@ -943,7 +959,6 @@ checkout_one(#state{service_queue = SQ0, Cons0, SQ1, []), State = State0#state{service_queue = SQ, messages = Messages, - prefix_msg_count = PrefMsgC, consumers = Cons}, Msg = case ConsumerMsg of '$prefix_msg' -> '$prefix_msg'; @@ -1034,19 +1049,26 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#state{messages = Messages0, +dehydrate_state(#state{messages = Messages, consumers = Consumers, - prefix_msg_count = MsgCount} = State) -> + returns = Returns, + prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) -> + %% TODO: optimise to avoid having to iterate the queue to get the number + %% of current returned messages + RetLen = lqueue:len(Returns), % O(1) + CurReturns = length([R || R <- lqueue:to_list(Returns), + R =/= '$prefix_msg']), + PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns, State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) - % C#consumer{checked_out = #{}} end, Consumers), - returns = queue:new(), + returns = lqueue:new(), %% messages include returns - prefix_msg_count = maps:size(Messages0) + MsgCount}. + prefix_msg_counts = {RetLen + PrefRetCnt, + PrefixMsgCnt}}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), @@ -1358,13 +1380,13 @@ down_with_noconnection_returns_unack_test() -> Cid = {<<"down_with_noconnect">>, Pid}, {State0, _} = enq(1, 1, second, test_init(test)), ?assertEqual(1, maps:size(State0#state.messages)), - ?assertEqual(0, queue:len(State0#state.returns)), + ?assertEqual(0, lqueue:len(State0#state.returns)), {State1, {_, _}} = deq(2, Cid, unsettled, State0), ?assertEqual(0, maps:size(State1#state.messages)), - ?assertEqual(0, queue:len(State1#state.returns)), + ?assertEqual(0, lqueue:len(State1#state.returns)), {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), ?assertEqual(1, maps:size(State2a#state.messages)), - ?assertEqual(1, queue:len(State2a#state.returns)), + ?assertEqual(1, lqueue:len(State2a#state.returns)), ok. down_with_noproc_enqueuer_is_cleaned_up_test() -> @@ -1579,6 +1601,26 @@ enq_check_settle_duplicate_test() -> % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). + +multi_return_snapshot_test() -> + %% this was discovered using property testing + C1 = {<<>>, c:pid(0,6723,1)}, + C2 = {<<0>>,c:pid(0,6723,1)}, + E = c:pid(0,6720,1), + Commands = [ + {checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg}, + {enqueue,E,2,msg}, + {checkout,cancel,C1}, %% both on returns queue + {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 + {return,[0],C2}, %% E1 in returns, E2 with C2 + {return,[1],C2}, %% E2 in returns E1 with C2 + {settle,[2],C2} %% E2 with C2 + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + + run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index e1848862fe..345a99a03c 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -15,83 +15,83 @@ -include_lib("ra/include/ra.hrl"). -compile({no_auto_import, [size/1]}). --record(state, {data = #{} :: #{integer() => term()}, - smallest :: undefined | non_neg_integer(), - largest :: undefined | non_neg_integer() - }). +-record(?MODULE, {data = #{} :: #{integer() => term()}, + smallest :: undefined | non_neg_integer(), + largest :: undefined | non_neg_integer() + }). --opaque state() :: #state{}. +-opaque state() :: #?MODULE{}. -export_type([state/0]). -spec empty() -> state(). empty() -> - #state{}. + #?MODULE{}. -spec fetch(integer(), state()) -> undefined | term(). -fetch(Key, #state{data = Data}) -> +fetch(Key, #?MODULE{data = Data}) -> maps:get(Key, Data, undefined). % only integer keys are supported -spec append(integer(), term(), state()) -> state(). append(Key, Value, - #state{data = Data, + #?MODULE{data = Data, smallest = Smallest, largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = ra_lib:default(Smallest, Key), largest = Key}. -spec return(integer(), term(), state()) -> state(). -return(Key, Value, #state{data = Data, smallest = Smallest} = State) +return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) when is_integer(Key) andalso Key < Smallest -> % TODO: this could potentially result in very large gaps which would % result in poor performance of smallest/1 % We could try to persist a linked list of "smallests" to make it quicker % to skip from one to the other - needs measurement - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = Key}; -return(Key, Value, #state{data = Data} = State) +return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> - State#state{data = maps:put(Key, Value, Data)}. + State#?MODULE{data = maps:put(Key, Value, Data)}. -spec delete(integer(), state()) -> state(). -delete(Smallest, #state{data = Data0, +delete(Smallest, #?MODULE{data = Data0, largest = Largest, smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> - State#state{data = Data, + State#?MODULE{data = Data, smallest = undefined, largest = undefined}; Next -> - State#state{data = Data, smallest = Next} + State#?MODULE{data = Data, smallest = Next} end; -delete(Key, #state{data = Data} = State) -> - State#state{data = maps:remove(Key, Data)}. +delete(Key, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:remove(Key, Data)}. -spec size(state()) -> non_neg_integer(). -size(#state{data = Data}) -> +size(#?MODULE{data = Data}) -> maps:size(Data). -spec smallest(state()) -> undefined | {integer(), term()}. -smallest(#state{smallest = undefined}) -> +smallest(#?MODULE{smallest = undefined}) -> undefined; -smallest(#state{smallest = Smallest, data = Data}) -> +smallest(#?MODULE{smallest = Smallest, data = Data}) -> {Smallest, maps:get(Smallest, Data)}. -spec next_key_after(non_neg_integer(), state()) -> undefined | integer(). -next_key_after(_Idx, #state{smallest = undefined}) -> +next_key_after(_Idx, #?MODULE{smallest = undefined}) -> % map must be empty undefined; -next_key_after(Idx, #state{smallest = Smallest, +next_key_after(Idx, #?MODULE{smallest = Smallest, largest = Largest}) when Idx+1 < Smallest orelse Idx+1 > Largest -> undefined; -next_key_after(Idx, #state{data = Data} = State) -> +next_key_after(Idx, #?MODULE{data = Data} = State) -> Next = Idx+1, case maps:is_key(Next, Data) of true -> @@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) -> end. -spec map(fun(), state()) -> state(). -map(F, #state{data = Data} = State) -> - State#state{data = maps:map(F, Data)}. +map(F, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:map(F, Data)}. %% internal |
