diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-31 17:45:36 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-02-01 11:15:10 +0000 |
| commit | fccacda101cb9a4f4968152228ba47a616d8587b (patch) | |
| tree | 24143a58f531ac880d23626eb35eac2c87efeb05 | |
| parent | 6ff8e97a086fe9bc6f05d7d5013de68a7d3dec94 (diff) | |
| download | rabbitmq-server-git-fccacda101cb9a4f4968152228ba47a616d8587b.tar.gz | |
Ensure rabbit_fifo snaphots are emitted
More regularly. Previously if more than one message was settled at a
time, stored potential snapshots could have been missed.
[#163631659]
| -rw-r--r-- | src/rabbit_fifo.erl | 133 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 121 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 14 |
3 files changed, 119 insertions, 149 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 247085460e..01331def3f 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -51,6 +51,7 @@ %% misc dehydrate_state/1, + normalize/1, %% protocol helpers make_enqueue/3, @@ -163,7 +164,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096 * 8). +-define(RELEASE_CURSOR_EVERY, 64000). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, @@ -198,7 +199,7 @@ -record(state, {name :: atom(), queue_resource :: rabbit_types:r('queue'), - shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(), + release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(), % unassigned messages messages = #{} :: #{msg_in_id() => indexed_msg()}, % defines the lowest message in id available in the messages map @@ -222,6 +223,8 @@ % index when there are large gaps but should be faster than gb_trees % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), + release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor, + ra_index(), state()}), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot consumers = #{} :: #{consumer_id() => #consumer{}}, @@ -258,7 +261,7 @@ queue_resource := rabbit_types:r('queue'), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), - shadow_copy_interval => non_neg_integer(), + release_cursor_interval => non_neg_integer(), max_length => non_neg_integer(), max_bytes => non_neg_integer(), single_active_consumer_on => boolean()}. @@ -287,7 +290,7 @@ init(#{name := Name, update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), + SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY), MaxLength = maps:get(max_length, Conf, undefined), MaxBytes = maps:get(max_bytes, Conf, undefined), ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of @@ -298,7 +301,7 @@ update_config(Conf, State) -> end, State#state{dead_letter_handler = DLH, become_leader_handler = BLH, - shadow_copy_interval = SHI, + release_cursor_interval = SHI, max_length = MaxLength, max_bytes = MaxBytes, consumer_strategy = ConsumerStrategy}. @@ -438,14 +441,12 @@ apply(#{index := RaftIdx}, #purge{}, returns = Returns, messages = Messages} = State0) -> Total = messages_ready(State0), - Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, - Indexes0, + Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, [I || {I, _} <- lists:sort(maps:values(Messages))]), - Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, - Indexes1, + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1, [I || {_, {I, _}} <- lqueue:to_list(Returns)]), {State, _, Effects} = - update_smallest_raft_index(RaftIdx, Indexes0, + update_smallest_raft_index(RaftIdx, State0#state{ra_indexes = Indexes, messages = #{}, returns = lqueue:new(), @@ -664,6 +665,7 @@ tick(_Ts, #state{name = Name, -spec overview(state()) -> map(). overview(#state{consumers = Cons, enqueuers = Enqs, + release_cursors = Cursors, msg_bytes_enqueue = EnqueueBytes, msg_bytes_checkout = CheckoutBytes} = State) -> #{type => ?MODULE, @@ -672,6 +674,7 @@ overview(#state{consumers = Cons, num_enqueuers => maps:size(Enqs), num_ready_messages => messages_ready(State), num_messages => messages_total(State), + num_release_cursors => lqueue:len(Cursors), enqueue_message_bytes => EnqueueBytes, checkout_message_bytes => CheckoutBytes}. @@ -815,7 +818,6 @@ messages_ready(#state{messages = M, messages_total(#state{ra_indexes = I, prefix_msgs = {PreR, PreM}}) -> - rabbit_fifo_index:size(I) + length(PreR) + length(PreM). update_use({inactive, _, _, _} = CUInfo, inactive) -> @@ -971,27 +973,33 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages, append_to_master_index(RaftIdx, #state{ra_indexes = Indexes0} = State0) -> State = incr_enqueue_count(State0), - Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0), + Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0), State#state{ra_indexes = Indexes}. incr_enqueue_count(#state{enqueue_count = C, - shadow_copy_interval = C} = State0) -> + release_cursor_interval = C} = State0) -> % this will trigger a dehydrated version of the state to be stored % at this raft index for potential future snapshot generation State0#state{enqueue_count = 0}; incr_enqueue_count(#state{enqueue_count = C} = State) -> State#state{enqueue_count = C + 1}. -maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0, - ra_indexes = Indexes} = State) -> - Dehydrated = dehydrate_state(State), - State#state{ra_indexes = - rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated, - Indexes)}; +maybe_store_dehydrated_state(RaftIdx, + #state{ra_indexes = Indexes, + enqueue_count = 0, + release_cursors = Cursors} = State) -> + case rabbit_fifo_index:exists(RaftIdx, Indexes) of + false -> + %% the incoming enqueue must already have been dropped + State; + true -> + Dehydrated = dehydrate_state(State), + Cursor = {release_cursor, RaftIdx, Dehydrated}, + State#state{release_cursors = lqueue:in(Cursor, Cursors)} + end; maybe_store_dehydrated_state(_RaftIdx, State) -> State. - enqueue_pending(From, #enqueuer{next_seqno = Next, pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0, @@ -1062,7 +1070,8 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded, credit = increase_credit(Con0, NumDiscarded)}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, + MsgRaftIdxs), {State0#state{consumers = Cons, ra_indexes = Indexes, service_queue = SQ}, Effects}. @@ -1081,7 +1090,7 @@ increase_credit(#consumer{credit = Current}, Credit) -> complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, - Effects0, #state{ra_indexes = Indexes0} = State0) -> + Effects0, State0) -> Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], @@ -1097,7 +1106,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, Con0, Checked, Effects0, State1), {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately - update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects). + update_smallest_raft_index(IncomingRaftIdx, State, Effects). dead_letter_effects(_Discarded, #state{dead_letter_handler = undefined}, @@ -1115,32 +1124,44 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> [{mod_call, rabbit_quorum_queue, cancel_consumer_handler, [QName, ConsumerId]} | Effects]. -update_smallest_raft_index(IncomingRaftIdx, OldIndexes, +update_smallest_raft_index(IncomingRaftIdx, #state{ra_indexes = Indexes, - messages = Messages} = State, Effects) -> + release_cursors = Cursors0} = State0, + Effects) -> case rabbit_fifo_index:size(Indexes) of - 0 when map_size(Messages) =:= 0 -> + 0 -> % there are no messages on queue anymore and no pending enqueues % we can forward release_cursor all the way until - % the last received command - {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]}; + % the last received command, hooray + State = State0#state{release_cursors = lqueue:new()}, + {State, ok, + [{release_cursor, IncomingRaftIdx, State} | Effects]}; _ -> - NewSmallest = rabbit_fifo_index:smallest(Indexes), - % Take the smallest raft index available in the index when starting - % to process this command - case {NewSmallest, rabbit_fifo_index:smallest(OldIndexes)} of - {{Smallest, _}, {Smallest, _}} -> - % smallest has not changed, do not issue release cursor - % effects - {State, ok, Effects}; - {_, {Smallest, Shadow}} when Shadow =/= undefined -> - {State, ok, [{release_cursor, Smallest, Shadow}]}; - _ -> % smallest - % no release cursor increase - {State, ok, Effects} + Smallest = rabbit_fifo_index:smallest(Indexes), + case find_next_cursor(Smallest, Cursors0) of + {empty, Cursors} -> + {State0#state{release_cursors = Cursors}, + ok, Effects}; + {Cursor, Cursors} -> + %% we can emit a release cursor we've passed the smallest + %% release cursor available. + {State0#state{release_cursors = Cursors}, ok, + [Cursor | Effects]} end end. +find_next_cursor(Idx, Cursors) -> + find_next_cursor(Idx, Cursors, empty). + +find_next_cursor(Smallest, Cursors0, Potential) -> + case lqueue:out(Cursors0) of + {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest -> + %% we found one but it may not be the largest one + find_next_cursor(Smallest, Cursors, Cursor); + _ -> + {Potential, Cursors0} + end. + return_one(0, {'$prefix_msg', _} = Msg, #state{returns = Returns} = State0) -> add_bytes_return(Msg, @@ -1173,8 +1194,7 @@ checkout(#{index := Index}, State0, Effects0) -> case evaluate_limit(State0#state.ra_indexes, false, State1, Effects1) of {State, true, Effects} -> - update_smallest_raft_index(Index, State0#state.ra_indexes, - State, Effects); + update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> {State, ok, Effects} end. @@ -1431,6 +1451,7 @@ dehydrate_state(#state{messages = Messages, lists:sort(maps:to_list(Messages))), State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), + release_cursors = lqueue:new(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> dehydrate_consumer(C) @@ -1447,6 +1468,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> end, Checked0), Con#consumer{checked_out = Checked}. +%% make the state suitable for equality comparison +normalize(#state{release_cursors = Cursors} = State) -> + State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}. + is_over_limit(#state{max_length = undefined, max_bytes = undefined}) -> false; @@ -1570,7 +1595,7 @@ test_init(Name) -> init(#{name => Name, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(Name, utf8)), - shadow_copy_interval => 0}). + release_cursor_interval => 0}). % To launch these tests: make eunit EUNIT_MODS="rabbit_fifo" @@ -2109,7 +2134,7 @@ run_snapshot_test0(Name, Commands) -> end, Entries), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - ?assertEqual(State, S) + ?assertEqual(normalize(State), normalize(S)) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -2235,7 +2260,7 @@ single_active_consumer_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), ?assertEqual(single_active, State0#state.consumer_strategy), ?assertEqual(0, map_size(State0#state.consumers)), @@ -2309,7 +2334,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), DummyFunction = fun() -> ok end, @@ -2363,7 +2388,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), Meta = #{index => 1}, @@ -2403,7 +2428,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), DummyFunction = fun() -> ok end, @@ -2433,7 +2458,7 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), DummyFunction = fun() -> ok end, @@ -2463,7 +2488,7 @@ query_consumers_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => false}), % adding some consumers @@ -2501,7 +2526,7 @@ query_consumers_when_single_active_consumer_is_on_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), Meta = #{index => 1}, % adding some consumers @@ -2535,7 +2560,7 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() -> State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => false}), DummyFunction = fun() -> ok end, @@ -2568,7 +2593,7 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co State0 = init(#{name => ?FUNCTION_NAME, queue_resource => rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), - shadow_copy_interval => 0, + release_cursor_interval => 0, single_active_consumer_on => true}), DummyFunction = fun() -> ok end, diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 184002611e..82a75b4adc 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -2,26 +2,27 @@ -export([ empty/0, - fetch/2, - append/3, - update_if_present/3, - return/3, + exists/2, + append/2, delete/2, size/1, smallest/1, - next_key_after/2, - map/2, - to_map/1 + map/2 ]). -include_lib("ra/include/ra.hrl"). -compile({no_auto_import, [size/1]}). --record(?MODULE, {data = #{} :: #{integer() => term()}, +%% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes). +%% This matters as the data map gets persisted as part of the snapshot +-define(NIL, ''). + +-record(?MODULE, {data = #{} :: #{integer() => ?NIL}, smallest :: undefined | non_neg_integer(), largest :: undefined | non_neg_integer() }). + -opaque state() :: #?MODULE{}. -export_type([state/0]). @@ -30,44 +31,21 @@ empty() -> #?MODULE{}. --spec fetch(integer(), state()) -> undefined | term(). -fetch(Key, #?MODULE{data = Data}) -> - maps:get(Key, Data, undefined). +-spec exists(integer(), state()) -> boolean(). +exists(Key, #?MODULE{data = Data}) -> + maps:is_key(Key, Data). % only integer keys are supported --spec append(integer(), term(), state()) -> state(). -append(Key, Value, +-spec append(integer(), state()) -> state(). +append(Key, #?MODULE{data = Data, smallest = Smallest, largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> - State#?MODULE{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, ?NIL, Data), smallest = ra_lib:default(Smallest, Key), largest = Key}. --spec update_if_present(integer(), term(), state()) -> state(). -update_if_present(Key, Value, #?MODULE{data = Data} = State) -> - case Data of - #{Key := _} -> - State#?MODULE{data = maps:put(Key, Value, Data)}; - _ -> - State - end. - - --spec return(integer(), term(), state()) -> state(). -return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) - when is_integer(Key) andalso Key < Smallest -> - % TODO: this could potentially result in very large gaps which would - % result in poor performance of smallest/1 - % We could try to persist a linked list of "smallests" to make it quicker - % to skip from one to the other - needs measurement - State#?MODULE{data = maps:put(Key, Value, Data), - smallest = Key}; -return(Key, Value, #?MODULE{data = Data} = State) - when is_integer(Key) -> - State#?MODULE{data = maps:put(Key, Value, Data)}. - -spec delete(Index :: integer(), state()) -> state(). delete(Smallest, #?MODULE{data = Data0, largest = Largest, @@ -76,8 +54,8 @@ delete(Smallest, #?MODULE{data = Data0, case find_next(Smallest + 1, Largest, Data) of undefined -> State#?MODULE{data = Data, - smallest = undefined, - largest = undefined}; + smallest = undefined, + largest = undefined}; Next -> State#?MODULE{data = Data, smallest = Next} end; @@ -88,33 +66,10 @@ delete(Key, #?MODULE{data = Data} = State) -> size(#?MODULE{data = Data}) -> maps:size(Data). --spec to_map(state()) -> #{integer() => term()}. -to_map(#?MODULE{data = Data}) -> - Data. - --spec smallest(state()) -> undefined | {integer(), term()}. -smallest(#?MODULE{smallest = undefined}) -> - undefined; -smallest(#?MODULE{smallest = Smallest, data = Data}) -> - {Smallest, maps:get(Smallest, Data)}. - +-spec smallest(state()) -> undefined | integer(). +smallest(#?MODULE{smallest = Smallest}) -> + Smallest. --spec next_key_after(non_neg_integer(), state()) -> undefined | integer(). -next_key_after(_Idx, #?MODULE{smallest = undefined}) -> - % map must be empty - undefined; -next_key_after(Idx, #?MODULE{smallest = Smallest, - largest = Largest}) - when Idx+1 < Smallest orelse Idx+1 > Largest -> - undefined; -next_key_after(Idx, #?MODULE{data = Data} = State) -> - Next = Idx+1, - case maps:is_key(Next, Data) of - true -> - Next; - false -> - next_key_after(Next, State) - end. -spec map(fun(), state()) -> state(). map(F, #?MODULE{data = Data} = State) -> @@ -142,40 +97,24 @@ find_next(Next, Last, Map) -> append_test() -> S0 = empty(), - undefined = fetch(99, S0), + false = exists(99, S0), undefined = smallest(S0), 0 = size(S0), - S1 = append(1, one, S0), - undefined = fetch(99, S1), - one = fetch(1, S1), + S1 = append(1, S0), + false = exists(99, S1), + true = exists(1, S1), 1 = size(S1), - {1, one} = smallest(S1), - S2 = append(2, two, S1), - two = fetch(2, S2), + 1 = smallest(S1), + S2 = append(2, S1), + true = exists(2, S2), 2 = size(S2), - {1, one} = smallest(S2), + 1 = smallest(S2), S3 = delete(1, S2), - {2, two} = smallest(S3), + 2 = smallest(S3), 1 = size(S3), - S4 = return(1, one, S3), - one = fetch(1, S4), - 2 = size(S4), - {1, one} = smallest(S4), - S5 = delete(2, delete(1, S4)), + S5 = delete(2, S3), undefined = smallest(S5), 0 = size(S0), ok. -next_after_test() -> - S = append(3, three, - append(2, two, - append(1, one, - empty()))), - 1 = next_key_after(0, S), - 2 = next_key_after(1, S), - 3 = next_key_after(2, S), - undefined = next_key_after(3, S), - undefined = next_key_after(4, S), - ok. - -endif. diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 5643da1991..c9a9183b5f 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -310,7 +310,7 @@ checkout_gen(Pid) -> -record(t, {state = rabbit_fifo:init(#{name => proper, queue_resource => blah, - shadow_copy_interval => 1}) + release_cursor_interval => 1}) :: rabbit_fifo:state(), index = 1 :: non_neg_integer(), %% raft index enqueuers = #{} :: #{pid() => term()}, @@ -473,7 +473,8 @@ run_snapshot_test(Conf, Commands) -> run_snapshot_test0(Conf, Commands) -> Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), - {State, Effects} = run_log(test_init(Conf), Entries), + {State0, Effects} = run_log(test_init(Conf), Entries), + State = rabbit_fifo:normalize(State0), % ct:pal("beginning snapshot test run for ~w numn commands ~b", % [maps:get(name, Conf), length(Commands)]), @@ -482,7 +483,12 @@ run_snapshot_test0(Conf, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), - {S, _} = run_log(SnapState, Filtered), + % ct:pal("running from snapshot at ~w ~p", [SnapIdx, SnapState]), + % ct:pal("Snapshot tests run log:~n" + % "~p~n from ~n~p~n Entries~n~p~n", + % [Filtered, SnapState, Entries]), + {S0, _} = run_log(SnapState, Filtered), + S = rabbit_fifo:normalize(S0), % assert log can be restored from any release cursor index case S of State -> ok; @@ -516,7 +522,7 @@ run_log(InitState, Entries) -> test_init(Conf) -> Default = #{queue_resource => blah, - shadow_copy_interval => 0, + release_cursor_interval => 0, metrics_handler => {?MODULE, metrics_handler, []}}, rabbit_fifo:init(maps:merge(Default, Conf)). |
