summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-31 17:45:36 +0000
committerkjnilsson <knilsson@pivotal.io>2019-02-01 11:15:10 +0000
commitfccacda101cb9a4f4968152228ba47a616d8587b (patch)
tree24143a58f531ac880d23626eb35eac2c87efeb05
parent6ff8e97a086fe9bc6f05d7d5013de68a7d3dec94 (diff)
downloadrabbitmq-server-git-fccacda101cb9a4f4968152228ba47a616d8587b.tar.gz
Ensure rabbit_fifo snaphots are emitted
More regularly. Previously if more than one message was settled at a time, stored potential snapshots could have been missed. [#163631659]
-rw-r--r--src/rabbit_fifo.erl133
-rw-r--r--src/rabbit_fifo_index.erl121
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl14
3 files changed, 119 insertions, 149 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 247085460e..01331def3f 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -51,6 +51,7 @@
%% misc
dehydrate_state/1,
+ normalize/1,
%% protocol helpers
make_enqueue/3,
@@ -163,7 +164,7 @@
-type applied_mfa() :: {module(), atom(), list()}.
% represents a partially applied module call
--define(SHADOW_COPY_INTERVAL, 4096 * 8).
+-define(RELEASE_CURSOR_EVERY, 64000).
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
@@ -198,7 +199,7 @@
-record(state,
{name :: atom(),
queue_resource :: rabbit_types:r('queue'),
- shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(),
+ release_cursor_interval = ?RELEASE_CURSOR_EVERY :: non_neg_integer(),
% unassigned messages
messages = #{} :: #{msg_in_id() => indexed_msg()},
% defines the lowest message in id available in the messages map
@@ -222,6 +223,8 @@
% index when there are large gaps but should be faster than gb_trees
% for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
+ release_cursors = lqueue:new() :: lqueue:lqueue({release_cursor,
+ ra_index(), state()}),
% consumers need to reflect consumer state at time of snapshot
% needs to be part of snapshot
consumers = #{} :: #{consumer_id() => #consumer{}},
@@ -258,7 +261,7 @@
queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- shadow_copy_interval => non_neg_integer(),
+ release_cursor_interval => non_neg_integer(),
max_length => non_neg_integer(),
max_bytes => non_neg_integer(),
single_active_consumer_on => boolean()}.
@@ -287,7 +290,7 @@ init(#{name := Name,
update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
+ SHI = maps:get(release_cursor_interval, Conf, ?RELEASE_CURSOR_EVERY),
MaxLength = maps:get(max_length, Conf, undefined),
MaxBytes = maps:get(max_bytes, Conf, undefined),
ConsumerStrategy = case maps:get(single_active_consumer_on, Conf, false) of
@@ -298,7 +301,7 @@ update_config(Conf, State) ->
end,
State#state{dead_letter_handler = DLH,
become_leader_handler = BLH,
- shadow_copy_interval = SHI,
+ release_cursor_interval = SHI,
max_length = MaxLength,
max_bytes = MaxBytes,
consumer_strategy = ConsumerStrategy}.
@@ -438,14 +441,12 @@ apply(#{index := RaftIdx}, #purge{},
returns = Returns,
messages = Messages} = State0) ->
Total = messages_ready(State0),
- Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2,
- Indexes0,
+ Indexes1 = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
[I || {I, _} <- lists:sort(maps:values(Messages))]),
- Indexes = lists:foldl(fun rabbit_fifo_index:delete/2,
- Indexes1,
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes1,
[I || {_, {I, _}} <- lqueue:to_list(Returns)]),
{State, _, Effects} =
- update_smallest_raft_index(RaftIdx, Indexes0,
+ update_smallest_raft_index(RaftIdx,
State0#state{ra_indexes = Indexes,
messages = #{},
returns = lqueue:new(),
@@ -664,6 +665,7 @@ tick(_Ts, #state{name = Name,
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
enqueuers = Enqs,
+ release_cursors = Cursors,
msg_bytes_enqueue = EnqueueBytes,
msg_bytes_checkout = CheckoutBytes} = State) ->
#{type => ?MODULE,
@@ -672,6 +674,7 @@ overview(#state{consumers = Cons,
num_enqueuers => maps:size(Enqs),
num_ready_messages => messages_ready(State),
num_messages => messages_total(State),
+ num_release_cursors => lqueue:len(Cursors),
enqueue_message_bytes => EnqueueBytes,
checkout_message_bytes => CheckoutBytes}.
@@ -815,7 +818,6 @@ messages_ready(#state{messages = M,
messages_total(#state{ra_indexes = I,
prefix_msgs = {PreR, PreM}}) ->
-
rabbit_fifo_index:size(I) + length(PreR) + length(PreM).
update_use({inactive, _, _, _} = CUInfo, inactive) ->
@@ -971,27 +973,33 @@ enqueue(RaftIdx, RawMsg, #state{messages = Messages,
append_to_master_index(RaftIdx,
#state{ra_indexes = Indexes0} = State0) ->
State = incr_enqueue_count(State0),
- Indexes = rabbit_fifo_index:append(RaftIdx, undefined, Indexes0),
+ Indexes = rabbit_fifo_index:append(RaftIdx, Indexes0),
State#state{ra_indexes = Indexes}.
incr_enqueue_count(#state{enqueue_count = C,
- shadow_copy_interval = C} = State0) ->
+ release_cursor_interval = C} = State0) ->
% this will trigger a dehydrated version of the state to be stored
% at this raft index for potential future snapshot generation
State0#state{enqueue_count = 0};
incr_enqueue_count(#state{enqueue_count = C} = State) ->
State#state{enqueue_count = C + 1}.
-maybe_store_dehydrated_state(RaftIdx, #state{enqueue_count = 0,
- ra_indexes = Indexes} = State) ->
- Dehydrated = dehydrate_state(State),
- State#state{ra_indexes =
- rabbit_fifo_index:update_if_present(RaftIdx, Dehydrated,
- Indexes)};
+maybe_store_dehydrated_state(RaftIdx,
+ #state{ra_indexes = Indexes,
+ enqueue_count = 0,
+ release_cursors = Cursors} = State) ->
+ case rabbit_fifo_index:exists(RaftIdx, Indexes) of
+ false ->
+ %% the incoming enqueue must already have been dropped
+ State;
+ true ->
+ Dehydrated = dehydrate_state(State),
+ Cursor = {release_cursor, RaftIdx, Dehydrated},
+ State#state{release_cursors = lqueue:in(Cursor, Cursors)}
+ end;
maybe_store_dehydrated_state(_RaftIdx, State) ->
State.
-
enqueue_pending(From,
#enqueuer{next_seqno = Next,
pending = [{Next, RaftIdx, RawMsg} | Pending]} = Enq0,
@@ -1062,7 +1070,8 @@ complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
credit = increase_credit(Con0, NumDiscarded)},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs),
+ Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
+ MsgRaftIdxs),
{State0#state{consumers = Cons,
ra_indexes = Indexes,
service_queue = SQ}, Effects}.
@@ -1081,7 +1090,7 @@ increase_credit(#consumer{credit = Current}, Credit) ->
complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
- Effects0, #state{ra_indexes = Indexes0} = State0) ->
+ Effects0, State0) ->
Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
@@ -1097,7 +1106,7 @@ complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
Con0, Checked, Effects0, State1),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
- update_smallest_raft_index(IncomingRaftIdx, Indexes0, State, Effects).
+ update_smallest_raft_index(IncomingRaftIdx, State, Effects).
dead_letter_effects(_Discarded,
#state{dead_letter_handler = undefined},
@@ -1115,32 +1124,44 @@ cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
[{mod_call, rabbit_quorum_queue,
cancel_consumer_handler, [QName, ConsumerId]} | Effects].
-update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
+update_smallest_raft_index(IncomingRaftIdx,
#state{ra_indexes = Indexes,
- messages = Messages} = State, Effects) ->
+ release_cursors = Cursors0} = State0,
+ Effects) ->
case rabbit_fifo_index:size(Indexes) of
- 0 when map_size(Messages) =:= 0 ->
+ 0 ->
% there are no messages on queue anymore and no pending enqueues
% we can forward release_cursor all the way until
- % the last received command
- {State, ok, [{release_cursor, IncomingRaftIdx, State} | Effects]};
+ % the last received command, hooray
+ State = State0#state{release_cursors = lqueue:new()},
+ {State, ok,
+ [{release_cursor, IncomingRaftIdx, State} | Effects]};
_ ->
- NewSmallest = rabbit_fifo_index:smallest(Indexes),
- % Take the smallest raft index available in the index when starting
- % to process this command
- case {NewSmallest, rabbit_fifo_index:smallest(OldIndexes)} of
- {{Smallest, _}, {Smallest, _}} ->
- % smallest has not changed, do not issue release cursor
- % effects
- {State, ok, Effects};
- {_, {Smallest, Shadow}} when Shadow =/= undefined ->
- {State, ok, [{release_cursor, Smallest, Shadow}]};
- _ -> % smallest
- % no release cursor increase
- {State, ok, Effects}
+ Smallest = rabbit_fifo_index:smallest(Indexes),
+ case find_next_cursor(Smallest, Cursors0) of
+ {empty, Cursors} ->
+ {State0#state{release_cursors = Cursors},
+ ok, Effects};
+ {Cursor, Cursors} ->
+ %% we can emit a release cursor we've passed the smallest
+ %% release cursor available.
+ {State0#state{release_cursors = Cursors}, ok,
+ [Cursor | Effects]}
end
end.
+find_next_cursor(Idx, Cursors) ->
+ find_next_cursor(Idx, Cursors, empty).
+
+find_next_cursor(Smallest, Cursors0, Potential) ->
+ case lqueue:out(Cursors0) of
+ {{value, {_, Idx, _} = Cursor}, Cursors} when Idx < Smallest ->
+ %% we found one but it may not be the largest one
+ find_next_cursor(Smallest, Cursors, Cursor);
+ _ ->
+ {Potential, Cursors0}
+ end.
+
return_one(0, {'$prefix_msg', _} = Msg,
#state{returns = Returns} = State0) ->
add_bytes_return(Msg,
@@ -1173,8 +1194,7 @@ checkout(#{index := Index}, State0, Effects0) ->
case evaluate_limit(State0#state.ra_indexes, false,
State1, Effects1) of
{State, true, Effects} ->
- update_smallest_raft_index(Index, State0#state.ra_indexes,
- State, Effects);
+ update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
{State, ok, Effects}
end.
@@ -1431,6 +1451,7 @@ dehydrate_state(#state{messages = Messages,
lists:sort(maps:to_list(Messages))),
State#state{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
+ release_cursors = lqueue:new(),
low_msg_num = undefined,
consumers = maps:map(fun (_, C) ->
dehydrate_consumer(C)
@@ -1447,6 +1468,10 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
end, Checked0),
Con#consumer{checked_out = Checked}.
+%% make the state suitable for equality comparison
+normalize(#state{release_cursors = Cursors} = State) ->
+ State#state{release_cursors = lqueue:from_list(lqueue:to_list(Cursors))}.
+
is_over_limit(#state{max_length = undefined,
max_bytes = undefined}) ->
false;
@@ -1570,7 +1595,7 @@ test_init(Name) ->
init(#{name => Name,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(Name, utf8)),
- shadow_copy_interval => 0}).
+ release_cursor_interval => 0}).
% To launch these tests: make eunit EUNIT_MODS="rabbit_fifo"
@@ -2109,7 +2134,7 @@ run_snapshot_test0(Name, Commands) ->
end, Entries),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- ?assertEqual(State, S)
+ ?assertEqual(normalize(State), normalize(S))
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -2235,7 +2260,7 @@ single_active_consumer_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
?assertEqual(single_active, State0#state.consumer_strategy),
?assertEqual(0, map_size(State0#state.consumers)),
@@ -2309,7 +2334,7 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
@@ -2363,7 +2388,7 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
Meta = #{index => 1},
@@ -2403,7 +2428,7 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
@@ -2433,7 +2458,7 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
@@ -2463,7 +2488,7 @@ query_consumers_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => false}),
% adding some consumers
@@ -2501,7 +2526,7 @@ query_consumers_when_single_active_consumer_is_on_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
Meta = #{index => 1},
% adding some consumers
@@ -2535,7 +2560,7 @@ active_flag_updated_when_consumer_suspected_unsuspected_test() ->
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => false}),
DummyFunction = fun() -> ok end,
@@ -2568,7 +2593,7 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
State0 = init(#{name => ?FUNCTION_NAME,
queue_resource => rabbit_misc:r("/", queue,
atom_to_binary(?FUNCTION_NAME, utf8)),
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl
index 184002611e..82a75b4adc 100644
--- a/src/rabbit_fifo_index.erl
+++ b/src/rabbit_fifo_index.erl
@@ -2,26 +2,27 @@
-export([
empty/0,
- fetch/2,
- append/3,
- update_if_present/3,
- return/3,
+ exists/2,
+ append/2,
delete/2,
size/1,
smallest/1,
- next_key_after/2,
- map/2,
- to_map/1
+ map/2
]).
-include_lib("ra/include/ra.hrl").
-compile({no_auto_import, [size/1]}).
--record(?MODULE, {data = #{} :: #{integer() => term()},
+%% the empty atom is a lot smaller (4 bytes) than e.g. `undefined` (13 bytes).
+%% This matters as the data map gets persisted as part of the snapshot
+-define(NIL, '').
+
+-record(?MODULE, {data = #{} :: #{integer() => ?NIL},
smallest :: undefined | non_neg_integer(),
largest :: undefined | non_neg_integer()
}).
+
-opaque state() :: #?MODULE{}.
-export_type([state/0]).
@@ -30,44 +31,21 @@
empty() ->
#?MODULE{}.
--spec fetch(integer(), state()) -> undefined | term().
-fetch(Key, #?MODULE{data = Data}) ->
- maps:get(Key, Data, undefined).
+-spec exists(integer(), state()) -> boolean().
+exists(Key, #?MODULE{data = Data}) ->
+ maps:is_key(Key, Data).
% only integer keys are supported
--spec append(integer(), term(), state()) -> state().
-append(Key, Value,
+-spec append(integer(), state()) -> state().
+append(Key,
#?MODULE{data = Data,
smallest = Smallest,
largest = Largest} = State)
when Key > Largest orelse Largest =:= undefined ->
- State#?MODULE{data = maps:put(Key, Value, Data),
+ State#?MODULE{data = maps:put(Key, ?NIL, Data),
smallest = ra_lib:default(Smallest, Key),
largest = Key}.
--spec update_if_present(integer(), term(), state()) -> state().
-update_if_present(Key, Value, #?MODULE{data = Data} = State) ->
- case Data of
- #{Key := _} ->
- State#?MODULE{data = maps:put(Key, Value, Data)};
- _ ->
- State
- end.
-
-
--spec return(integer(), term(), state()) -> state().
-return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State)
- when is_integer(Key) andalso Key < Smallest ->
- % TODO: this could potentially result in very large gaps which would
- % result in poor performance of smallest/1
- % We could try to persist a linked list of "smallests" to make it quicker
- % to skip from one to the other - needs measurement
- State#?MODULE{data = maps:put(Key, Value, Data),
- smallest = Key};
-return(Key, Value, #?MODULE{data = Data} = State)
- when is_integer(Key) ->
- State#?MODULE{data = maps:put(Key, Value, Data)}.
-
-spec delete(Index :: integer(), state()) -> state().
delete(Smallest, #?MODULE{data = Data0,
largest = Largest,
@@ -76,8 +54,8 @@ delete(Smallest, #?MODULE{data = Data0,
case find_next(Smallest + 1, Largest, Data) of
undefined ->
State#?MODULE{data = Data,
- smallest = undefined,
- largest = undefined};
+ smallest = undefined,
+ largest = undefined};
Next ->
State#?MODULE{data = Data, smallest = Next}
end;
@@ -88,33 +66,10 @@ delete(Key, #?MODULE{data = Data} = State) ->
size(#?MODULE{data = Data}) ->
maps:size(Data).
--spec to_map(state()) -> #{integer() => term()}.
-to_map(#?MODULE{data = Data}) ->
- Data.
-
--spec smallest(state()) -> undefined | {integer(), term()}.
-smallest(#?MODULE{smallest = undefined}) ->
- undefined;
-smallest(#?MODULE{smallest = Smallest, data = Data}) ->
- {Smallest, maps:get(Smallest, Data)}.
-
+-spec smallest(state()) -> undefined | integer().
+smallest(#?MODULE{smallest = Smallest}) ->
+ Smallest.
--spec next_key_after(non_neg_integer(), state()) -> undefined | integer().
-next_key_after(_Idx, #?MODULE{smallest = undefined}) ->
- % map must be empty
- undefined;
-next_key_after(Idx, #?MODULE{smallest = Smallest,
- largest = Largest})
- when Idx+1 < Smallest orelse Idx+1 > Largest ->
- undefined;
-next_key_after(Idx, #?MODULE{data = Data} = State) ->
- Next = Idx+1,
- case maps:is_key(Next, Data) of
- true ->
- Next;
- false ->
- next_key_after(Next, State)
- end.
-spec map(fun(), state()) -> state().
map(F, #?MODULE{data = Data} = State) ->
@@ -142,40 +97,24 @@ find_next(Next, Last, Map) ->
append_test() ->
S0 = empty(),
- undefined = fetch(99, S0),
+ false = exists(99, S0),
undefined = smallest(S0),
0 = size(S0),
- S1 = append(1, one, S0),
- undefined = fetch(99, S1),
- one = fetch(1, S1),
+ S1 = append(1, S0),
+ false = exists(99, S1),
+ true = exists(1, S1),
1 = size(S1),
- {1, one} = smallest(S1),
- S2 = append(2, two, S1),
- two = fetch(2, S2),
+ 1 = smallest(S1),
+ S2 = append(2, S1),
+ true = exists(2, S2),
2 = size(S2),
- {1, one} = smallest(S2),
+ 1 = smallest(S2),
S3 = delete(1, S2),
- {2, two} = smallest(S3),
+ 2 = smallest(S3),
1 = size(S3),
- S4 = return(1, one, S3),
- one = fetch(1, S4),
- 2 = size(S4),
- {1, one} = smallest(S4),
- S5 = delete(2, delete(1, S4)),
+ S5 = delete(2, S3),
undefined = smallest(S5),
0 = size(S0),
ok.
-next_after_test() ->
- S = append(3, three,
- append(2, two,
- append(1, one,
- empty()))),
- 1 = next_key_after(0, S),
- 2 = next_key_after(1, S),
- 3 = next_key_after(2, S),
- undefined = next_key_after(3, S),
- undefined = next_key_after(4, S),
- ok.
-
-endif.
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 5643da1991..c9a9183b5f 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -310,7 +310,7 @@ checkout_gen(Pid) ->
-record(t, {state = rabbit_fifo:init(#{name => proper,
queue_resource => blah,
- shadow_copy_interval => 1})
+ release_cursor_interval => 1})
:: rabbit_fifo:state(),
index = 1 :: non_neg_integer(), %% raft index
enqueuers = #{} :: #{pid() => term()},
@@ -473,7 +473,8 @@ run_snapshot_test(Conf, Commands) ->
run_snapshot_test0(Conf, Commands) ->
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
- {State, Effects} = run_log(test_init(Conf), Entries),
+ {State0, Effects} = run_log(test_init(Conf), Entries),
+ State = rabbit_fifo:normalize(State0),
% ct:pal("beginning snapshot test run for ~w numn commands ~b",
% [maps:get(name, Conf), length(Commands)]),
@@ -482,7 +483,12 @@ run_snapshot_test0(Conf, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- {S, _} = run_log(SnapState, Filtered),
+ % ct:pal("running from snapshot at ~w ~p", [SnapIdx, SnapState]),
+ % ct:pal("Snapshot tests run log:~n"
+ % "~p~n from ~n~p~n Entries~n~p~n",
+ % [Filtered, SnapState, Entries]),
+ {S0, _} = run_log(SnapState, Filtered),
+ S = rabbit_fifo:normalize(S0),
% assert log can be restored from any release cursor index
case S of
State -> ok;
@@ -516,7 +522,7 @@ run_log(InitState, Entries) ->
test_init(Conf) ->
Default = #{queue_resource => blah,
- shadow_copy_interval => 0,
+ release_cursor_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}},
rabbit_fifo:init(maps:merge(Default, Conf)).