summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-03-18 16:30:15 +0000
committerkjnilsson <knilsson@pivotal.io>2019-03-18 16:30:15 +0000
commit568bbd358487195ee325e57c8b72ec7a5d7735ea (patch)
treef493bf4e35d89f2803979112505c857d9e963066
parenta58bc89f73c9af27fda43adcb2639b1f474b5aa5 (diff)
downloadrabbitmq-server-git-568bbd358487195ee325e57c8b72ec7a5d7735ea.tar.gz
rabbit_fifo bugfixes
Bugfixes around noconnect consumer handling as well as refactoring to make these kinds of errors less likely. [#164691411]
-rw-r--r--src/rabbit_fifo.erl221
-rw-r--r--test/rabbit_fifo_SUITE.erl74
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl2
3 files changed, 185 insertions, 112 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index e551834490..c09c8a4f22 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -184,11 +184,9 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
case Cons0 of
- #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
- Checked = maps:without(MsgIds, Checked0),
+ #{ConsumerId := #consumer{checked_out = Checked0}} ->
Returned = maps:with(MsgIds, Checked0),
- MsgNumMsgs = maps:values(Returned),
- return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, Returned, [], State);
_ ->
{State, ok}
end;
@@ -318,17 +316,23 @@ apply(_, {down, Pid, noconnection},
Node = node(Pid),
%% if the pid refers to the active consumer, mark it as suspected and return
%% it to the waiting queue
- {State1, Effects0} = case maps:to_list(Cons0) of
- [{{_, P} = Cid, C}] when node(P) =:= Node ->
- %% the consumer should be returned to waiting
- %%
- Effs = consumer_update_active_effects(
- State0, Cid, C, false, suspected_down, []),
- {State0#?MODULE{consumers = #{},
- waiting_consumers = Waiting0 ++ [{Cid, C}]},
- Effs};
- _ -> {State0, []}
- end,
+ {State1, Effects0} =
+ case maps:to_list(Cons0) of
+ [{{_, P} = Cid, C0}] when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ State0, Cid, C0, false, suspected_down, []),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(State0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ #{Cid := C} = St#?MODULE.consumers,
+ {St#?MODULE{consumers = #{},
+ waiting_consumers = Waiting0 ++ [{Cid, C}]},
+ Effs1};
+ _ -> {State0, []}
+ end,
WaitingConsumers = update_waiting_consumer_status(Node, State1,
suspected_down),
@@ -357,21 +361,20 @@ apply(_, {down, Pid, noconnection},
Node = node(Pid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- {Cons, State, Effects1} =
- maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0,
- status = up} = C,
- {Co, St0, Eff}) when node(P) =:= Node ->
- {St, Eff0} = return_all(St0, Checked0, Eff, K, C),
- Credit = increase_credit(C, maps:size(Checked0)),
- Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
+ {State, Effects1} =
+ maps:fold(fun({_, P} = Cid, #consumer{checked_out = Checked0,
+ status = up} = C0,
+ {St0, Eff}) when node(P) =:= Node ->
+ Credit = increase_credit(C0, map_size(Checked0)),
+ C = C0#consumer{status = suspected_down,
+ credit = Credit},
+ {St, Eff0} = return_all(St0, Eff, Cid, C),
+ Eff1 = ConsumerUpdateActiveFun(St, Cid, C, false,
suspected_down, Eff0),
- {maps:put(K, C#consumer{status = suspected_down,
- credit = Credit,
- checked_out = #{}}, Co),
- St, Eff1};
- (K, C, {Co, St, Eff}) ->
- {maps:put(K, C, Co), St, Eff}
- end, {#{}, State0, []}, Cons0),
+ {St, Eff1};
+ (_, _, {St, Eff}) ->
+ {St, Eff}
+ end, {State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = suspected_down};
(_, E) -> E
@@ -380,14 +383,14 @@ apply(_, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes
- Effects2 = case maps:size(Cons) of
+ Effects2 = case maps:size(State#?MODULE.consumers) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
_ ->
[{monitor, node, Node}]
end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2};
+ {State#?MODULE{enqueuers = Enqs}, ok, Effects2};
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -795,9 +798,9 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
| Effects].
cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
- case maps:take(ConsumerId, C0) of
- {Consumer, Cons1} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
+ case C0 of
+ #{ConsumerId := Consumer} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
Effects0, Reason),
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
@@ -810,7 +813,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
_ ->
{S, Effects}
end;
- error ->
+ _ ->
%% already removed: do nothing
{S0, Effects0}
end.
@@ -847,9 +850,9 @@ activate_next_consumer(#?MODULE{consumers = Cons,
-maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
- Cons1, #?MODULE{consumers = C0,
- service_queue = SQ0} = S0,
+maybe_return_all(ConsumerId, Consumer,
+ #?MODULE{consumers = C0,
+ service_queue = SQ0} = S0,
Effects0, Reason) ->
case Reason of
consumer_cancel ->
@@ -859,11 +862,12 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
credit = 0,
status = cancelled},
C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1};
+ {S0#?MODULE{consumers = Cons,
+ service_queue = SQ}, Effects1};
down ->
- {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId,
- Consumer),
- {S1#?MODULE{consumers = Cons1}, Effects1}
+ {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -980,38 +984,43 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
- Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) ->
- Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgNumMsgs))},
- {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects0),
- {State1, Effects2} = lists:foldl(
- fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0,
- ConsumerId, Con);
- ({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0,
- ConsumerId, Con)
- end, {State0, Effects1}, MsgNumMsgs),
- checkout(Meta, State1#?MODULE{consumers = Cons,
- service_queue = SQ},
- Effects2).
+return(Meta, ConsumerId, Returned,
+ Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ {State1, Effects1} = maps:fold(
+ fun(MsgId, {'$prefix_msg', _} = Msg, {S0, E0}) ->
+ return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ (MsgId, {MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgId, MsgNum, Msg, S0, E0,
+ ConsumerId)
+ end, {State0, Effects0}, Returned),
+ #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers,
+ Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ SQ0, Effects1),
+ State = State1#?MODULE{consumers = Cons,
+ service_queue = SQ},
+ checkout(Meta, State, Effects2).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
- Con0, Checked, Effects0,
+complete(ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
+ MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
- Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, NumDiscarded)},
+ Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
+ credit = increase_credit(Con0, maps:size(Discarded))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
- {State0#?MODULE{consumers = Cons,
+ State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
+ add_bytes_settle(RawMsg, Acc);
+ ({'$prefix_msg', _} = M, Acc) ->
+ add_bytes_settle(M, Acc)
+ end, State0, maps:values(Discarded)),
+ {State1#?MODULE{consumers = Cons,
ra_indexes = Indexes,
service_queue = SQ}, Effects}.
@@ -1030,19 +1039,9 @@ increase_credit(#consumer{credit = Current}, Credit) ->
complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
- Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
- MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
- State1 = lists:foldl(fun({_, {_, {_, RawMsg}}}, Acc) ->
- add_bytes_settle(RawMsg, Acc);
- ({'$prefix_msg', _} = M, Acc) ->
- add_bytes_settle(M, Acc)
- end, State0, maps:values(Discarded)),
- %% need to pass the length of discarded as $prefix_msgs would be filtered
- %% by the above list comprehension
- {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
- maps:size(Discarded),
- Con0, Checked, Effects0, State1),
+ {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ Effects0, State0),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
@@ -1102,69 +1101,70 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {'$prefix_msg', Header0},
+return_one(MsgId, 0, {'$prefix_msg', Header0},
#?MODULE{returns = Returns,
+ consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) ->
- Header = maps:update_with(delivery_count,
- fun (C) -> C+1 end,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
Msg = {'$prefix_msg', Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- Checked = Con#consumer.checked_out,
- {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
- Effects0, State0),
- {add_bytes_settle(Msg, State1), Effects};
+ complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
- {add_bytes_return(Msg,
- State0#?MODULE{returns = lqueue:in(Msg, Returns)}),
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ {add_bytes_return(
+ Msg,
+ State0#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
-return_one(MsgNum, {RaftId, {Header0, RawMsg}},
+return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
+ consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) ->
- Header = maps:update_with(delivery_count,
- fun (C) -> C+1 end,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
DlMsg = {MsgNum, Msg},
- Effects = dead_letter_effects(delivery_limit,
- #{none => DlMsg},
+ Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
- Checked = Con#consumer.checked_out,
- {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
- Effects, State0),
- {add_bytes_settle(RawMsg, State1), Effects1};
+ complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
%% this should not affect the release cursor in any way
- {add_bytes_return(RawMsg,
- State0#?MODULE{returns =
- lqueue:in({MsgNum, Msg}, Returns)}),
+ {add_bytes_return(
+ RawMsg,
+ State0#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
-return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
+return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+ #consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(0, Msg, S, E, ConsumerId, Consumer);
- ({_, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
- end, {State0, Effects0}, Checked).
+ State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
+ lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ end, {State, Effects0}, Checked).
%% checkout new messages to consumers
%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, #{}),
- case evaluate_limit(State0#?MODULE.ra_indexes, false,
- State1, Effects1) of
+ case evaluate_limit(false, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
@@ -1187,17 +1187,16 @@ checkout0({Activity, State0}, Effects0, Acc) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(_OldIndexes, Result,
+evaluate_limit(Result,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(OldIndexes, Result,
- State0, Effects0) ->
+evaluate_limit(Result, State0, Effects0) ->
case is_over_limit(State0) of
true ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(OldIndexes, true, State, Effects);
+ evaluate_limit(true, State, Effects);
false ->
{State0, Result, Effects0}
end.
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index cf14a68d9d..fccda9772a 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -283,6 +283,19 @@ duplicate_enqueue_test(_) ->
?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
ok.
+return_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+ {State0, _} = enq(1, 1, msg, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ {State2, _} = check_auto(Cid2, 3, State1),
+ {State3, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [0]), State2),
+ ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
+ State3#rabbit_fifo.consumers),
+ ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
+ State3#rabbit_fifo.consumers),
+ ok.
+
return_non_existent_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
@@ -556,7 +569,7 @@ purge_with_checkout_test(_) ->
?assertEqual(1, maps:size(Checked)),
ok.
-down_returns_checked_out_in_order_test(_) ->
+down_noproc_returns_checked_out_in_order_test(_) ->
S0 = test_init(?FUNCTION_NAME),
%% enqueue 100
S1 = lists:foldl(fun (Num, FS0) ->
@@ -572,6 +585,30 @@ down_returns_checked_out_in_order_test(_) ->
{S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
Returns = lqueue:to_list(S#rabbit_fifo.returns),
?assertEqual(100, length(Returns)),
+ ?assertEqual(0, maps:size(S#rabbit_fifo.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+down_noconnection_returns_checked_out_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ NumMsgs = 20,
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, NumMsgs)),
+ ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
+ ?assertEqual(NumMsgs, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
+ Returns = lqueue:to_list(S#rabbit_fifo.returns),
+ ?assertEqual(NumMsgs, length(Returns)),
+ ?assertMatch(#consumer{checked_out = Ch}
+ when map_size(Ch) == 0,
+ maps:get(Cid, S#rabbit_fifo.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -736,6 +773,41 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
ok.
+single_active_returns_messages_on_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1],
+ ConsumerIds = [{_, DownPid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+ {State2, _} = enq(4, 1, msg1, State1),
+ % simulate node goes down
+ {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
+ %% assert the consumer is up
+ ?assertMatch([_], lqueue:to_list(State3#rabbit_fifo.returns)),
+ ?assertMatch([{_, #consumer{checked_out = Checked}}]
+ when map_size(Checked) == 0,
+ State3#rabbit_fifo.waiting_consumers),
+
+ ok.
+
single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 76b8d7e715..0f5fb8bf58 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -285,7 +285,9 @@ scenario17(_Config) ->
make_checkout(C2, cancel),
make_enqueue(E,2,<<"two">>),
{nodeup,rabbit@fake_node1},
+ %% this has no effect as was returned
make_settle(C1, [0]),
+ %% this should settle "one"
make_settle(C1, [1])
],
run_snapshot_test(#{name => ?FUNCTION_NAME,