summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_fifo.erl248
-rw-r--r--src/rabbit_fifo.hrl7
-rw-r--r--src/rabbit_quorum_queue.erl6
-rw-r--r--test/quorum_queue_SUITE.erl4
-rw-r--r--test/rabbit_fifo_SUITE.erl330
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl17
-rw-r--r--test/single_active_consumer_SUITE.erl8
-rw-r--r--test/test_util.erl17
8 files changed, 438 insertions, 199 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index b06d34e83a..97e5f6e901 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -94,6 +94,7 @@
#credit{} |
#purge{} |
#update_config{}.
+
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types supported by ra fifo
@@ -134,7 +135,7 @@ update_config(Conf, State) ->
true ->
single_active;
false ->
- default
+ competing
end,
Cfg = State#?MODULE.cfg,
State#?MODULE{cfg = Cfg#cfg{release_cursor_interval = SHI,
@@ -194,7 +195,8 @@ apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
drain = Drain, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0,
- service_queue = ServiceQueue0} = State0) ->
+ service_queue = ServiceQueue0,
+ waiting_consumers = Waiting0} = State0) ->
case Cons0 of
#{ConsumerId := #consumer{delivery_count = DelCnt} = Con0} ->
%% this can go below 0 when credit is reduced
@@ -207,7 +209,7 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{State1, ok, Effects} =
checkout(Meta, State0#?MODULE{service_queue = ServiceQueue,
consumers = Cons}, []),
- Response = {send_credit_reply, maps:size(State1#?MODULE.messages)},
+ Response = {send_credit_reply, messages_ready(State1)},
%% by this point all checkouts for the updated credit value
%% should be processed so we can evaluate the drain
case Drain of
@@ -231,6 +233,20 @@ apply(Meta, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
{multi, [Response, {send_drained, [{CTag, Drained}]}]},
Effects}
end;
+ _ when Waiting0 /= [] ->
+ %% there are waiting consuemrs
+ case lists:keytake(ConsumerId, 1, Waiting0) of
+ {value, {_, Con0 = #consumer{delivery_count = DelCnt}}, Waiting} ->
+ %% the consumer is a waiting one
+ %% grant the credit
+ C = max(0, RemoteDelCnt + NewCredit - DelCnt),
+ Con = Con0#consumer{credit = C},
+ State = State0#?MODULE{waiting_consumers =
+ [{ConsumerId, Con} | Waiting]},
+ {State, {send_credit_reply, messages_ready(State)}};
+ false ->
+ {State0, ok}
+ end;
_ ->
%% credit for unknown consumer - just ignore
{State0, ok}
@@ -248,7 +264,8 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
{State0, {dequeue, empty}};
Ready ->
State1 = update_consumer(ConsumerId, ConsumerMeta,
- {once, 1, simple_prefetch}, State0),
+ {once, 1, simple_prefetch},
+ State0),
{success, _, MsgId, Msg, State2} = checkout_one(State1),
case Settlement of
unsettled ->
@@ -257,10 +274,9 @@ apply(Meta, #checkout{spec = {dequeue, Settlement},
[{monitor, process, Pid}]};
settled ->
%% immediately settle the checkout
- {State, _, Effects} = apply(Meta,
- make_settle(ConsumerId,
- [MsgId]),
- State2),
+ {State, _, Effects} =
+ apply(Meta, make_settle(ConsumerId, [MsgId]),
+ State2),
{State, {dequeue, {MsgId, Msg}, Ready-1}, Effects}
end
end;
@@ -294,27 +310,64 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
-apply(_, {down, ConsumerPid, noconnection},
+apply(_, {down, Pid, noconnection},
+ #?MODULE{consumers = Cons0,
+ cfg = #cfg{consumer_strategy = single_active},
+ waiting_consumers = Waiting0,
+ enqueuers = Enqs0} = State0) ->
+ Node = node(Pid),
+ %% if the pid refers to the active consumer, mark it as suspected and return
+ %% it to the waiting queue
+ {State1, Effects0} = case maps:to_list(Cons0) of
+ [{{_, P} = Cid, C}] when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %%
+ Effs = consumer_update_active_effects(
+ State0, Cid, C, false, suspected_down, []),
+ {State0#?MODULE{consumers = #{},
+ waiting_consumers = Waiting0 ++ [{Cid, C}]},
+ Effs};
+ _ -> {State0, []}
+ end,
+ WaitingConsumers = update_waiting_consumer_status(Node, State1,
+ suspected_down),
+
+ %% select a new consumer from the waiting queue and run a checkout
+ State2 = State1#?MODULE{waiting_consumers = WaitingConsumers},
+ {State, Effects1} = activate_next_consumer(State2, Effects0),
+
+ %% mark any enquers as suspected
+ Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
+ E#enqueuer{status = suspected_down};
+ (_, E) -> E
+ end, Enqs0),
+ Effects = [{monitor, node, Node} | Effects1],
+ {State#?MODULE{enqueuers = Enqs}, ok, Effects};
+apply(_, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
- Node = node(ConsumerPid),
+ %% A node has been disconnected. This doesn't necessarily mean that
+ %% any processes on this node are down, they _may_ come back so here
+ %% we just mark them as suspected (effectively deactivated)
+ %% and return all checked out messages to the main queue for delivery to any
+ %% live consumers
+ %%
+ %% all pids for the disconnected node will be marked as suspected not just
+ %% the one we got the `down' command for
+ Node = node(Pid),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- % mark all consumers and enqueuers as suspected down
- % and monitor the node so that we can find out the final state of the
- % process at some later point
+
{Cons, State, Effects1} =
- maps:fold(fun({_, P} = K,
- #consumer{checked_out = Checked0} = C,
- {Co, St0, Eff}) when (node(P) =:= Node) and
- (C#consumer.status =/= cancelled)->
+ maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0,
+ status = up} = C,
+ {Co, St0, Eff}) when node(P) =:= Node ->
{St, Eff0} = return_all(St0, Checked0, Eff, K, C),
Credit = increase_credit(C, maps:size(Checked0)),
Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
suspected_down, Eff0),
- {maps:put(K,
- C#consumer{status = suspected_down,
- credit = Credit,
- checked_out = #{}}, Co),
+ {maps:put(K, C#consumer{status = suspected_down,
+ credit = Credit,
+ checked_out = #{}}, Co),
St, Eff1};
(K, C, {Co, St, Eff}) ->
{maps:put(K, C, Co), St, Eff}
@@ -323,10 +376,10 @@ apply(_, {down, ConsumerPid, noconnection},
E#enqueuer{status = suspected_down};
(_, E) -> E
end, Enqs0),
- % mark waiting consumers as suspected if necessary
- WaitingConsumers = update_waiting_consumer_status(Node, State0,
- suspected_down),
+ % Monitor the node so that we can "unsuspect" these processes when the node
+ % comes back, then re-issue all monitors and discover the final fate of
+ % these processes
Effects2 = case maps:size(Cons) of
0 ->
[{aux, inactive}, {monitor, node, Node}];
@@ -334,8 +387,7 @@ apply(_, {down, ConsumerPid, noconnection},
[{monitor, node, Node}]
end ++ Effects1,
%% TODO: should we run a checkout here?
- {State#?MODULE{consumers = Cons, enqueuers = Enqs,
- waiting_consumers = WaitingConsumers}, ok, Effects2};
+ {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2};
apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
% Remove any enqueuer for the same pid and enqueue any pending messages
@@ -367,36 +419,36 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Monitors = [{monitor, process, P}
|| P <- suspected_pids_for(Node, State0)],
- % un-suspect waiting consumers when necessary
- WaitingConsumers = update_waiting_consumer_status(Node, State0, up),
-
Enqs1 = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = up};
(_, E) -> E
end, Enqs0),
ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- {Cons1, SQ, Effects} =
- maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
- when (node(P) =:= Node) and
- (C#consumer.status =/= cancelled) ->
- EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C,
- true, up, EAcc),
- update_or_remove_sub(
- ConsumerId, C#consumer{status = up},
- CAcc, SQAcc, EAcc1);
- (_, _, Acc) ->
- Acc
- end, {Cons0, SQ0, Monitors}, Cons0),
-
- checkout(Meta, State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
- service_queue = SQ,
- waiting_consumers = WaitingConsumers}, Effects);
+ %% mark all consumers as up
+ {Cons1, SQ, Effects1} =
+ maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc})
+ when (node(P) =:= Node) and
+ (C#consumer.status =/= cancelled) ->
+ EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId,
+ C, true, up, EAcc),
+ update_or_remove_sub(ConsumerId,
+ C#consumer{status = up}, CAcc,
+ SQAcc, EAcc1);
+ (_, _, Acc) ->
+ Acc
+ end, {Cons0, SQ0, Monitors}, Cons0),
+ Waiting = update_waiting_consumer_status(Node, State0, up),
+ State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ service_queue = SQ,
+ waiting_consumers = Waiting},
+ {State, Effects} = activate_next_consumer(State1, Effects1),
+ checkout(Meta, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
-consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = default}}) ->
+consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
ActivityStatus, Effects)
@@ -407,7 +459,7 @@ consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = sin
end.
handle_waiting_consumer_down(_Pid,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State) ->
{[], State};
handle_waiting_consumer_down(_Pid,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
@@ -429,27 +481,18 @@ handle_waiting_consumer_down(Pid,
State = State0#?MODULE{waiting_consumers = StillUp},
{Effects, State}.
-update_waiting_consumer_status(_Node, #?MODULE{cfg = #cfg{consumer_strategy = default}},
- _Status) ->
- [];
-update_waiting_consumer_status(_Node,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = []},
- _Status) ->
- [];
update_waiting_consumer_status(Node,
- #?MODULE{cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers},
+ #?MODULE{waiting_consumers = WaitingConsumers},
Status) ->
[begin
- case node(P) of
+ case node(Pid) of
Node ->
{ConsumerId, Consumer#consumer{status = Status}};
_ ->
{ConsumerId, Consumer}
end
- end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers,
- Consumer#consumer.status =/= cancelled].
+ end || {{_, Pid} = ConsumerId, Consumer} <- WaitingConsumers,
+ Consumer#consumer.status =/= cancelled].
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #?MODULE{consumers = Cons,
@@ -583,7 +626,7 @@ query_consumers(#?MODULE{consumers = Consumers,
cfg = #cfg{consumer_strategy = ConsumerStrategy}} = State) ->
ActiveActivityStatusFun =
case ConsumerStrategy of
- default ->
+ competing ->
fun(_ConsumerId,
#consumer{status = Status}) ->
case Status of
@@ -709,8 +752,8 @@ num_checked_out(#?MODULE{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State, Effects, Reason) ->
- %% general case, single active consumer off
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State,
+ Effects, Reason) ->
cancel_consumer0(ConsumerId, State, Effects, Reason);
cancel_consumer(ConsumerId,
#?MODULE{cfg = #cfg{consumer_strategy = single_active},
@@ -721,41 +764,23 @@ cancel_consumer(ConsumerId,
cancel_consumer(ConsumerId,
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
- waiting_consumers = WaitingConsumers0} = State0,
+ waiting_consumers = Waiting0} = State0,
Effects0, Reason) ->
%% single active consumer on, consumers are waiting
- case maps:take(ConsumerId, Cons0) of
- {Consumer, Cons1} ->
+ case maps:is_key(ConsumerId, Cons0) of
+ true ->
% The active consumer is to be removed
- % Cancel it
- {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1,
- State0, Effects0, Reason),
- Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1),
- % Take another one from the waiting consumers and put it in consumers
- [{NewActiveConsumerId, NewActiveConsumer}
- | RemainingWaitingConsumers] = WaitingConsumers0,
- #?MODULE{service_queue = ServiceQueue} = State1,
- ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId,
- NewActiveConsumer,
- ServiceQueue),
- State = State1#?MODULE{consumers = maps:put(NewActiveConsumerId,
- NewActiveConsumer,
- State1#?MODULE.consumers),
- service_queue = ServiceQueue1,
- waiting_consumers = RemainingWaitingConsumers},
- Effects = consumer_update_active_effects(State, NewActiveConsumerId,
- NewActiveConsumer, true,
- single_active, Effects2),
- {State, Effects};
- error ->
+ {State1, Effects1} = cancel_consumer0(ConsumerId, State0,
+ Effects0, Reason),
+ activate_next_consumer(State1, Effects1);
+ false ->
% The cancelled consumer is not the active one
% Just remove it from idle_consumers
- WaitingConsumers = lists:keydelete(ConsumerId, 1,
- WaitingConsumers0),
+ Waiting = lists:keydelete(ConsumerId, 1, Waiting0),
Effects = cancel_consumer_effects(ConsumerId, State0, Effects0),
% A waiting consumer isn't supposed to have any checked out messages,
% so nothing special to do here
- {State0#?MODULE{waiting_consumers = WaitingConsumers}, Effects}
+ {State0#?MODULE{waiting_consumers = Waiting}, Effects}
end.
consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
@@ -765,9 +790,7 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
Ack = maps:get(ack, Meta, undefined),
Prefetch = maps:get(prefetch, Meta, undefined),
Args = maps:get(args, Meta, []),
- [{mod_call,
- rabbit_quorum_queue,
- update_consumer_handler,
+ [{mod_call, rabbit_quorum_queue, update_consumer_handler,
[QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]}
| Effects].
@@ -776,6 +799,10 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{Consumer, Cons1} ->
{S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
Effects0, Reason),
+ %% The effects are emitted before the consumer is actually removed
+ %% if the consumer has unacked messages. This is a bit weird but
+ %% in line with what classic queues do (from an external point of
+ %% view)
Effects = cancel_consumer_effects(ConsumerId, S, Effects2),
case maps:size(S#?MODULE.consumers) of
0 ->
@@ -788,6 +815,38 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
{S0, Effects0}
end.
+activate_next_consumer(#?MODULE{consumers = Cons,
+ waiting_consumers = Waiting0} = State0,
+ Effects0) ->
+ case maps:filter(fun (_, #consumer{status = S}) -> S == up end, Cons) of
+ Up when map_size(Up) == 0 ->
+ %% there are no active consumer in the consumer map
+ case lists:filter(fun ({_, #consumer{status = Status}}) ->
+ Status == up
+ end, Waiting0) of
+ [{NextConsumerId, NextConsumer} | _] ->
+ %% there is a potential next active consumer
+ Remaining = lists:keydelete(NextConsumerId, 1, Waiting0),
+ #?MODULE{service_queue = ServiceQueue} = State0,
+ ServiceQueue1 = maybe_queue_consumer(NextConsumerId,
+ NextConsumer,
+ ServiceQueue),
+ State = State0#?MODULE{consumers = Cons#{NextConsumerId => NextConsumer},
+ service_queue = ServiceQueue1,
+ waiting_consumers = Remaining},
+ Effects = consumer_update_active_effects(State, NextConsumerId,
+ NextConsumer, true,
+ single_active, Effects0),
+ {State, Effects};
+ [] ->
+ {State0, [{aux, inactive} | Effects0]}
+ end;
+ _ ->
+ {State0, Effects0}
+ end.
+
+
+
maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
Cons1, #?MODULE{consumers = C0,
service_queue = SQ0} = S0,
@@ -1296,7 +1355,7 @@ uniq_queue_in(Key, Queue) ->
end.
update_consumer(ConsumerId, Meta, Spec,
- #?MODULE{cfg = #cfg{consumer_strategy = default}} = State0) ->
+ #?MODULE{cfg = #cfg{consumer_strategy = competing}} = State0) ->
%% general case, single active consumer off
update_consumer0(ConsumerId, Meta, Spec, State0);
update_consumer(ConsumerId, Meta, Spec,
@@ -1331,7 +1390,6 @@ update_consumer0(ConsumerId, Meta, {Life, Credit, Mode},
end, Init, Cons0),
ServiceQueue = maybe_queue_consumer(ConsumerId, maps:get(ConsumerId, Cons),
ServiceQueue0),
-
State0#?MODULE{consumers = Cons, service_queue = ServiceQueue}.
maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
diff --git a/src/rabbit_fifo.hrl b/src/rabbit_fifo.hrl
index ebe5f3328a..968ae07739 100644
--- a/src/rabbit_fifo.hrl
+++ b/src/rabbit_fifo.hrl
@@ -88,6 +88,8 @@
-type consumer() :: #consumer{}.
+-type consumer_strategy() :: competing | single_active.
+
-record(enqueuer,
{next_seqno = 1 :: msg_seqno(),
% out of order enqueues - sorted list
@@ -104,7 +106,8 @@
max_length :: maybe(non_neg_integer()),
max_bytes :: maybe(non_neg_integer()),
%% whether single active consumer is on or not for this queue
- consumer_strategy = default :: default | single_active,
+ consumer_strategy = competing :: consumer_strategy(),
+ %% the maximum number of unsuccessful delivery attempts permitted
delivery_limit :: maybe(non_neg_integer())
}).
@@ -114,7 +117,7 @@
messages = #{} :: #{msg_in_id() => indexed_msg()},
% defines the lowest message in id available in the messages map
% that isn't a return
- low_msg_num :: msg_in_id() | undefined,
+ low_msg_num :: maybe(msg_in_id()),
% defines the next message in id to be added to the messages map
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index e811bfffb3..c685785d0d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -120,6 +120,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
RaMachine = ra_machine(NewQ),
ServerIds = [{RaName, Node} || Node <- Nodes],
ClusterName = RaName,
+ TickTimeout = application:get_env(rabbit, quorum_tick_interval, ?TICK_TIMEOUT),
RaConfs = [begin
UId = ra:new_uid(ra_lib:to_binary(ClusterName)),
FName = rabbit_misc:rs(QName),
@@ -129,7 +130,7 @@ declare(Q) when ?amqqueue_is_quorum(Q) ->
friendly_name => FName,
initial_members => ServerIds,
log_init_args => #{uid => UId},
- tick_timeout => ?TICK_TIMEOUT,
+ tick_timeout => TickTimeout,
machine => RaMachine}
end || ServerId <- ServerIds],
@@ -190,7 +191,8 @@ update_consumer(QName, ChPid, ConsumerTag, Exclusive, AckRequired, Prefetch, Act
QName, Prefetch, Active, ActivityStatus, Args).
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
- local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer, [QName, ChPid, ConsumerTag]).
+ local_or_remote_handler(ChPid, rabbit_quorum_queue, cancel_consumer,
+ [QName, ChPid, ConsumerTag]).
cancel_consumer(QName, ChPid, ConsumerTag) ->
catch rabbit_core_metrics:consumer_deleted(ChPid, ConsumerTag, QName),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 0ec65c31b8..bab28ba1ff 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -126,8 +126,10 @@ memory_tests() ->
%% Testsuite setup/teardown.
%% -------------------------------------------------------------------
-init_per_suite(Config) ->
+init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config0, {rabbit, [{quorum_tick_interval, 1000}]}),
rabbit_ct_helpers:run_setup_steps(
Config,
[fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]).
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 72f09c3c64..cf14a68d9d 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -27,8 +27,7 @@ all() ->
%% replicate eunit like test resultion
all_tests() ->
[F || {F, _} <- ?MODULE:module_info(functions),
- re:run(atom_to_list(F), "_test$") /= nomatch]
- .
+ re:run(atom_to_list(F), "_test$") /= nomatch].
groups() ->
[
@@ -588,76 +587,89 @@ single_active_consumer_test(_) ->
% adding some consumers
AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- meta(1),
- make_checkout({CTag, self()},
- {once, 1, simple_prefetch},
- #{}),
- State),
- NewState
+ {NewState, _, _} = apply(
+ meta(1),
+ make_checkout({CTag, self()},
+ {once, 1, simple_prefetch},
+ #{}),
+ State),
+ NewState
end,
- State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(AddConsumer, State0,
+ [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ C3 = {<<"ctag3">>, self()},
+ C4 = {<<"ctag4">>, self()},
% the first registered consumer is the active one, the others are waiting
?assertEqual(1, map_size(State1#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State1#rabbit_fifo.consumers)),
+ ?assertMatch(#{C1 := _}, State1#rabbit_fifo.consumers),
?assertEqual(3, length(State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag3">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C3, 1, State1#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State1#rabbit_fifo.waiting_consumers)),
% cancelling a waiting consumer
{State2, _, Effects1} = apply(meta(2),
- make_checkout({<<"ctag3">>, self()},
- cancel, #{}), State1),
+ make_checkout(C3, cancel, #{}),
+ State1),
% the active consumer should still be in place
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag1">>, self()}, State2#rabbit_fifo.consumers)),
+ ?assertMatch(#{C1 := _}, State2#rabbit_fifo.consumers),
% the cancelled consumer has been removed from waiting consumers
?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag2">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C2, 1, State2#rabbit_fifo.waiting_consumers)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1, State2#rabbit_fifo.waiting_consumers)),
% there are some effects to unregister the consumer
- ?assertEqual(1, length(Effects1)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects1),
% cancelling the active consumer
{State3, _, Effects2} = apply(meta(3),
- make_checkout({<<"ctag1">>, self()},
- cancel, #{}),
+ make_checkout(C1, cancel, #{}),
State2),
% the second registered consumer is now the active one
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag2">>, self()}, State3#rabbit_fifo.consumers)),
+ ?assertMatch(#{C2 := _}, State3#rabbit_fifo.consumers),
% the new active consumer is no longer in the waiting list
?assertEqual(1, length(State3#rabbit_fifo.waiting_consumers)),
- ?assertNotEqual(false, lists:keyfind({<<"ctag4">>, self()}, 1, State3#rabbit_fifo.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects2)),
+ ?assertNotEqual(false, lists:keyfind(C4, 1,
+ State3#rabbit_fifo.waiting_consumers)),
+ %% should have a cancel consumer handler mod_call effect and
+ %% an active new consumer effect
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
% cancelling the active consumer
{State4, _, Effects3} = apply(meta(4),
- make_checkout({<<"ctag2">>, self()},
- cancel, #{}),
+ make_checkout(C2, cancel, #{}),
State3),
% the last waiting consumer became the active one
?assertEqual(1, map_size(State4#rabbit_fifo.consumers)),
- ?assert(maps:is_key({<<"ctag4">>, self()}, State4#rabbit_fifo.consumers)),
+ ?assertMatch(#{C4 := _}, State4#rabbit_fifo.consumers),
% the waiting consumer list is now empty
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
- % there are some effects to unregister the consumer and to update the new active one (metrics)
- ?assertEqual(2, length(Effects3)),
+ % there are some effects to unregister the consumer and
+ % to update the new active one (metrics)
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects3),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects3),
% cancelling the last consumer
{State5, _, Effects4} = apply(meta(5),
- make_checkout({<<"ctag4">>, self()},
- cancel, #{}),
+ make_checkout(C4, cancel, #{}),
State4),
% no active consumer anymore
?assertEqual(0, map_size(State5#rabbit_fifo.consumers)),
% still nothing in the waiting list
?assertEqual(0, length(State5#rabbit_fifo.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects4)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, _}, Effects4),
ok.
@@ -673,6 +685,9 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
Pid2 = spawn(DummyFunction),
Pid3 = spawn(DummyFunction),
+ [C1, C2, C3, C4] = Consumers =
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}],
% adding some consumers
AddConsumer = fun({CTag, ChannelId}, State) ->
{NewState, _, _} = apply(
@@ -681,27 +696,34 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
State),
NewState
end,
- State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ State1 = lists:foldl(AddConsumer, State0, Consumers),
% the channel of the active consumer goes down
- {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, doesnotmatter}, State1),
+ {State2, _, Effects} = apply(#{index => 2}, {down, Pid1, noproc}, State1),
% fell back to another consumer
?assertEqual(1, map_size(State2#rabbit_fifo.consumers)),
% there are still waiting consumers
?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
% effects to unregister the consumer and
% to update the new active one (metrics) are there
- ?assertEqual(2, length(Effects)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C1, Effects),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects),
% the channel of the active consumer and a waiting consumer goes down
- {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, doesnotmatter}, State2),
+ {State3, _, Effects2} = apply(#{index => 3}, {down, Pid2, noproc}, State2),
% fell back to another consumer
?assertEqual(1, map_size(State3#rabbit_fifo.consumers)),
% no more waiting consumer
?assertEqual(0, length(State3#rabbit_fifo.waiting_consumers)),
% effects to cancel both consumers of this channel + effect to update the new active one (metrics)
- ?assertEqual(3, length(Effects2)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C2, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C3, Effects2),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ update_consumer_handler, _}, Effects2),
% the last channel goes down
{State4, _, Effects3} = apply(#{index => 4}, {down, Pid3, doesnotmatter}, State3),
@@ -709,48 +731,107 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
?assertEqual(0, map_size(State4#rabbit_fifo.consumers)),
?assertEqual(0, length(State4#rabbit_fifo.waiting_consumers)),
% there is an effect to unregister the consumer + queue inactive effect
- ?assertEqual(1 + 1, length(Effects3)),
+ ?ASSERT_EFF({mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [_, C]}, C == C4, Effects3),
ok.
-single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnection_test(_) ->
+single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
-
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
Meta = #{index => 1},
+ Nodes = [n1, n2, node()],
+ ConsumerIds = [C1 = {_, DownPid}, C2, _C3] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
% adding some consumers
- AddConsumer = fun(CTag, State) ->
- {NewState, _, _} = apply(
- Meta,
- make_checkout({CTag, self()},
- {once, 1, simple_prefetch}, #{}),
- State),
- NewState
- end,
- State1 = lists:foldl(AddConsumer, State0,
- [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]),
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}},
+ State1#rabbit_fifo.consumers),
% simulate node goes down
- {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1),
+ {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
- % all the waiting consumers should be suspected down
- ?assertEqual(3, length(State2#rabbit_fifo.waiting_consumers)),
- lists:foreach(fun({_, #consumer{status = Status}}) ->
- ?assert(Status == suspected_down)
- end, State2#rabbit_fifo.waiting_consumers),
+ %% assert a new consumer is in place and it is up
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State2#rabbit_fifo.consumers)),
+
+ %% the disconnected consumer has been returned to waiting
+ ?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
+ State2#rabbit_fifo.waiting_consumers)),
+ ?assertEqual(2, length(State2#rabbit_fifo.waiting_consumers)),
- % simulate node goes back up
- {State3, _, _} = apply(#{index => 2}, {nodeup, node(self())}, State2),
+ % simulate node comes back up
+ {State3, _, _} = apply(#{index => 2}, {nodeup, node(DownPid)}, State2),
- % all the waiting consumers should be un-suspected
- ?assertEqual(3, length(State3#rabbit_fifo.waiting_consumers)),
+ %% the consumer is still active and the same as before
+ ?assertMatch([{C2, #consumer{status = up}}],
+ maps:to_list(State3#rabbit_fifo.consumers)),
+ % the waiting consumers should be un-suspected
+ ?assertEqual(2, length(State3#rabbit_fifo.waiting_consumers)),
lists:foreach(fun({_, #consumer{status = Status}}) ->
?assert(Status /= suspected_down)
end, State3#rabbit_fifo.waiting_consumers),
+ ok.
+single_active_consumer_all_disconnected_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1, n2],
+ ConsumerIds = [C1 = {_, C1Pid}, C2 = {_, C2Pid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+
+ %% assert the consumer is up
+ ?assertMatch(#{C1 := #consumer{status = up}}, State1#rabbit_fifo.consumers),
+
+ % simulate node goes down
+ {State2, _, _} = apply(meta(5), {down, C1Pid, noconnection}, State1),
+ %% assert the consumer fails over to the consumer on n2
+ ?assertMatch(#{C2 := #consumer{status = up}}, State2#rabbit_fifo.consumers),
+ {State3, _, _} = apply(meta(6), {down, C2Pid, noconnection}, State2),
+ %% assert these no active consumer after both nodes are maked as down
+ ?assertMatch([], maps:to_list(State3#rabbit_fifo.consumers)),
+ %% n2 comes back
+ {State4, _, _} = apply(meta(7), {nodeup, node(C2Pid)}, State3),
+ %% ensure n2 is the active consumer as this node as been registered
+ %% as up again
+ ?assertMatch([{{<<"ctag_n2">>, _}, #consumer{status = up,
+ credit = 1}}],
+ maps:to_list(State4#rabbit_fifo.consumers)),
ok.
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
@@ -783,11 +864,11 @@ single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
?assertEqual(2 * 3 + 1, length(Effects)).
single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
+ Resource = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource => Resource,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -805,7 +886,8 @@ single_active_consumer_state_enter_eol_include_waiting_consumers_test(_) ->
NewState
end,
State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
Effects = rabbit_fifo:state_enter(eol, State1),
% 1 effect for each consumer process (channel process)
@@ -918,10 +1000,10 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_consumer_is_on_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -938,15 +1020,103 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
NewState
end,
State1 = lists:foldl(AddConsumer, State0,
- [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
+ [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
+ {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
{State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
- % only 1 effect to monitor the node
- ?assertEqual(1, length(Effects2)),
+ % one monitor and one consumer status update (deactivated)
+ ?assertEqual(3, length(Effects2)),
{_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
- ?assertEqual(4, length(Effects3)).
+ ?assertEqual(5, length(Effects3)).
+
+single_active_cancelled_with_unacked_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 1, simple_prefetch},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% enqueue 2 messages
+ {State2, _Effects2} = enq(3, 1, msg1, State1),
+ {State3, _Effects3} = enq(4, 2, msg2, State2),
+ %% one should be checked ou to C1
+ %% cancel C1
+ {State4, _, _} = apply(meta(5),
+ make_checkout(C1, cancel, #{}),
+ State3),
+ %% C2 should be the active consumer
+ ?assertMatch(#{C2 := #consumer{status = up,
+ checked_out = #{0 := _}}},
+ State4#rabbit_fifo.consumers),
+ %% C1 should be a cancelled consumer
+ ?assertMatch(#{C1 := #consumer{status = cancelled,
+ lifetime = once,
+ checked_out = #{0 := _}}},
+ State4#rabbit_fifo.consumers),
+ ?assertMatch([], State4#rabbit_fifo.waiting_consumers),
+
+ %% Ack both messages
+ {State5, _Effects5} = settle(C1, 1, 0, State4),
+ %% C1 should now be cancelled
+ {State6, _Effects6} = settle(C2, 2, 0, State5),
+
+ %% C2 should remain
+ ?assertMatch(#{C2 := #consumer{status = up}},
+ State6#rabbit_fifo.consumers),
+ %% C1 should be gone
+ ?assertNotMatch(#{C1 := _},
+ State6#rabbit_fifo.consumers),
+ ?assertMatch([], State6#rabbit_fifo.waiting_consumers),
+ ok.
+
+single_active_with_credited_test(_) ->
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+
+ C1 = {<<"ctag1">>, self()},
+ C2 = {<<"ctag2">>, self()},
+ % adding some consumers
+ AddConsumer = fun(C, S0) ->
+ {S, _, _} = apply(
+ meta(1),
+ make_checkout(C,
+ {auto, 0, credited},
+ #{}),
+ S0),
+ S
+ end,
+ State1 = lists:foldl(AddConsumer, State0, [C1, C2]),
+
+ %% add some credit
+ C1Cred = rabbit_fifo:make_credit(C1, 5, 0, false),
+ {State2, _, _Effects2} = apply(meta(3), C1Cred, State1),
+ C2Cred = rabbit_fifo:make_credit(C2, 4, 0, false),
+ {State3, _} = apply(meta(4), C2Cred, State2),
+ %% both consumers should have credit
+ ?assertMatch(#{C1 := #consumer{credit = 5}},
+ State3#rabbit_fifo.consumers),
+ ?assertMatch([{C2, #consumer{credit = 4}}],
+ State3#rabbit_fifo.waiting_consumers),
+ ok.
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index da72c030cd..c61d85fbff 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -271,21 +271,6 @@ scenario16(_Config) ->
delivery_limit => 1}, Commands),
ok.
-fake_pid(_Config) ->
- Pid = fake_external_pid(<<"mynode@banana">>),
- ?assertNotEqual(node(Pid), node()),
- ?assert(is_pid(Pid)),
- ok.
-
-fake_external_pid(Node) when is_binary(Node) ->
- ThisNodeSize = size(term_to_binary(node())) + 1,
- Pid = spawn(fun () -> ok end),
- %% drop the local node data from a local pid
- <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
- S = size(Node),
- %% replace it with the incoming node binary
- Final = <<131,103, 100, 0, S, Node/binary, LocalPidData/binary>>,
- binary_to_term(Final).
snapshots(_Config) ->
run_proper(
@@ -352,7 +337,7 @@ log_gen(Size) ->
pid_gen(Nodes) ->
?LET(Node, oneof(Nodes),
- fake_external_pid(atom_to_binary(Node, utf8))).
+ test_util:fake_pid(atom_to_binary(Node, utf8))).
down_gen(Pid) ->
?LET(E, {down, Pid, oneof([noconnection, noproc])}, E).
diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl
index 0b12f54c0b..c22a13c744 100644
--- a/test/single_active_consumer_SUITE.erl
+++ b/test/single_active_consumer_SUITE.erl
@@ -45,12 +45,14 @@ groups() ->
]}
].
-init_per_suite(Config) ->
+init_per_suite(Config0) ->
rabbit_ct_helpers:log_environment(),
- Config1 = rabbit_ct_helpers:set_config(Config, [
+ Config1 = rabbit_ct_helpers:set_config(Config0, [
{rmq_nodename_suffix, ?MODULE}
]),
- rabbit_ct_helpers:run_setup_steps(Config1,
+ Config = rabbit_ct_helpers:merge_app_env(
+ Config1, {rabbit, [{quorum_tick_interval, 1000}]}),
+ rabbit_ct_helpers:run_setup_steps(Config,
rabbit_ct_broker_helpers:setup_steps() ++
rabbit_ct_client_helpers:setup_steps()).
diff --git a/test/test_util.erl b/test/test_util.erl
new file mode 100644
index 0000000000..7fcf247898
--- /dev/null
+++ b/test/test_util.erl
@@ -0,0 +1,17 @@
+-module(test_util).
+
+-export([
+ fake_pid/1
+ ]).
+
+
+fake_pid(Node) ->
+ NodeBin = rabbit_data_coercion:to_binary(Node),
+ ThisNodeSize = size(term_to_binary(node())) + 1,
+ Pid = spawn(fun () -> ok end),
+ %% drop the local node data from a local pid
+ <<_:ThisNodeSize/binary, LocalPidData/binary>> = term_to_binary(Pid),
+ S = size(NodeBin),
+ %% replace it with the incoming node binary
+ Final = <<131,103, 100, S:16/unsigned, NodeBin/binary, LocalPidData/binary>>,
+ binary_to_term(Final).