summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lqueue.erl39
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_fifo.erl497
-rw-r--r--src/rabbit_fifo_client.erl51
-rw-r--r--src/rabbit_quorum_queue.erl73
-rw-r--r--src/truncate.erl127
-rw-r--r--test/quorum_queue_SUITE.erl14
-rw-r--r--test/rabbit_fifo_SUITE.erl50
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl101
-rw-r--r--test/unit_SUITE.erl70
10 files changed, 509 insertions, 542 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 1abe4e0b82..2b75ef4856 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -25,27 +25,32 @@
-define(QUEUE, queue).
--export_type([?MODULE/0]).
+-export_type([
+ ?MODULE/0,
+ ?MODULE/1
+ ]).
--opaque ?MODULE() :: {non_neg_integer(), queue:queue()}.
+-opaque ?MODULE() :: {non_neg_integer(), queue:queue(term())}.
+-opaque ?MODULE(T) :: {non_neg_integer(), queue:queue(T)}.
-type value() :: any().
--type result() :: 'empty' | {'value', value()}.
+-type result(T) :: 'empty' | {'value', T}.
--spec new() -> ?MODULE().
--spec drop(?MODULE()) -> ?MODULE().
--spec is_empty(?MODULE()) -> boolean().
--spec len(?MODULE()) -> non_neg_integer().
--spec in(value(), ?MODULE()) -> ?MODULE().
+-spec new() -> ?MODULE(_).
+-spec drop(?MODULE(T)) -> ?MODULE(T).
+-spec is_empty(?MODULE(_)) -> boolean().
+-spec len(?MODULE(_)) -> non_neg_integer().
+-spec in(T, ?MODULE(T)) -> ?MODULE(T).
-spec in_r(value(), ?MODULE()) -> ?MODULE().
--spec out(?MODULE()) -> {result(), ?MODULE()}.
--spec out_r(?MODULE()) -> {result(), ?MODULE()}.
--spec join(?MODULE(), ?MODULE()) -> ?MODULE().
--spec foldl(fun ((value(), B) -> B), B, ?MODULE()) -> B.
--spec foldr(fun ((value(), B) -> B), B, ?MODULE()) -> B.
--spec from_list([value()]) -> ?MODULE().
--spec to_list(?MODULE()) -> [value()].
--spec peek(?MODULE()) -> result().
--spec peek_r(?MODULE()) -> result().
+-spec out(?MODULE(T)) -> {result(T), ?MODULE()}.
+-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}.
+-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B).
+-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+-spec from_list([T]) -> ?MODULE(T).
+-spec to_list(?MODULE(T)) -> [T].
+% -spec peek(?MODULE()) -> result().
+-spec peek(?MODULE(T)) -> result(T).
+-spec peek_r(?MODULE(T)) -> result(T).
new() -> {0, ?QUEUE:new()}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 0f7d569a00..aa9889aef6 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -926,8 +926,12 @@ notify_policy_changed(#amqqueue{pid = QPid,
name = QName}) when ?IS_QUORUM(QPid) ->
rabbit_quorum_queue:policy_changed(QName, QPid).
-consumers(#amqqueue{ pid = QPid }) ->
- delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
+consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
+consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) ->
+ {ok, {_, Result}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_consumers/1),
+ maps:values(Result).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -1154,14 +1158,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP
[rabbit_misc:rs(QName), Reason])
end.
-basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, LimiterPid,
- LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid,
+ LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, QState) ->
ok = check_consume_arguments(QName, Args),
- case delegate:invoke(QPid, {gen_server2, call,
- [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
- Args, OkMsg, ActingUser}, infinity]}) of
+ case delegate:invoke(QPid,
+ {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg, ActingUser}, infinity]}) of
ok ->
{ok, QState};
Err ->
@@ -1171,15 +1176,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid,
_LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
_ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) ->
{error, global_qos_not_supported_for_queue_type};
-basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, NoAck, ChPid,
- _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag,
- ExclusiveConsume, Args, OkMsg, _ActingUser, QStates) ->
+basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q,
+ NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount,
+ ConsumerTag, ExclusiveConsume, Args, OkMsg,
+ ActingUser, QStates) ->
ok = check_consume_arguments(QName, Args),
QState0 = get_quorum_state(Id, QName, QStates),
{ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
ConsumerPrefetchCount,
ConsumerTag,
ExclusiveConsume, Args,
+ ActingUser,
OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 00d0db0b8a..2d5c267227 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -39,18 +39,23 @@
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
+ query_consumers/1,
usage/1,
%% misc
- dehydrate_state/1
+ dehydrate_state/1,
+
+ %% protocol helpers
+ make_enqueue/3,
+ make_checkout/3,
+ make_settle/2,
+ make_return/2,
+ make_discard/2,
+ make_credit/4,
+ make_purge/0,
+ make_update_state/1
]).
--ifdef(TEST).
--export([
- metrics_handler/1
- ]).
--endif.
-
-type raw_msg() :: term().
%% The raw message. It is opaque to rabbit_fifo.
@@ -99,19 +104,43 @@
{dequeue, settled | unsettled} |
cancel.
--type protocol() ::
- {enqueue, Sender :: maybe(pid()), MsgSeq :: maybe(msg_seqno()),
- Msg :: raw_msg()} |
- {checkout, Spec :: checkout_spec(), Consumer :: consumer_id()} |
- {settle, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {return, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {discard, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {credit,
- Credit :: non_neg_integer(),
- DeliveryCount :: non_neg_integer(),
- Drain :: boolean(),
- Consumer :: consumer_id()} |
- purge.
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+%% command records representing all the protocol actions that are supported
+-record(enqueue, {pid :: maybe(pid()),
+ seq :: maybe(msg_seqno()),
+ msg :: raw_msg()}).
+-record(checkout, {consumer_id :: consumer_id(),
+ spec :: checkout_spec(),
+ meta :: consumer_meta()}).
+-record(settle, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(return, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(discard, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(credit, {consumer_id :: consumer_id(),
+ credit :: non_neg_integer(),
+ delivery_count :: non_neg_integer(),
+ drain :: boolean()}).
+-record(purge, {}).
+-record(update_state, {config :: config()}).
+
+
+
+-opaque protocol() ::
+ #enqueue{} |
+ #checkout{} |
+ #settle{} |
+ #return{} |
+ #discard{} |
+ #credit{} |
+ #purge{} |
+ #update_state{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -126,7 +155,8 @@
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
- {checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
next_msg_id = 0 :: msg_id(), % part of snapshot data
%% max number of messages that can be sent
%% decremented for each delivery
@@ -153,6 +183,7 @@
-record(state,
{name :: atom(),
+ queue_resource :: rabbit_types:r('queue'),
shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(),
% unassigned messages
messages = #{} :: #{msg_in_id() => indexed_msg()},
@@ -163,7 +194,7 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'),
+ returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -183,9 +214,7 @@
% needs to be part of snapshot
service_queue = queue:new() :: queue:queue(consumer_id()),
dead_letter_handler :: maybe(applied_mfa()),
- cancel_consumer_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
- metrics_handler :: maybe(applied_mfa()),
%% This is a special field that is only used for snapshots
%% It represents the number of queued messages at the time the
%% dehydrated snapshot state was cached.
@@ -203,16 +232,17 @@
-opaque state() :: #state{}.
-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
- cancel_consumer_handler => applied_mfa(),
- metrics_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer()}.
-export_type([protocol/0,
delivery/0,
command/0,
+ credit_mode/0,
consumer_tag/0,
+ consumer_meta/0,
consumer_id/0,
client_msg/0,
msg/0,
@@ -222,20 +252,18 @@
state/0,
config/0]).
--spec init(config()) -> {state(), ra_machine:effects()}.
-init(#{name := Name} = Conf) ->
- update_state(Conf, #state{name = Name}).
+-spec init(config()) -> state().
+init(#{name := Name,
+ queue_resource := Resource} = Conf) ->
+ update_state(Conf, #state{name = Name,
+ queue_resource = Resource}).
update_state(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
- CCH = maps:get(cancel_consumer_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
- MH = maps:get(metrics_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
State#state{dead_letter_handler = DLH,
- cancel_consumer_handler = CCH,
become_leader_handler = BLH,
- metrics_handler = MH,
shadow_copy_interval = SHI}.
% msg_ids are scoped per consumer
@@ -243,7 +271,8 @@ update_state(Conf, State) ->
-spec apply(ra_machine:command_meta_data(), command(),
ra_machine:effects(), state()) ->
{state(), ra_machine:effects(), Reply :: term()}.
-apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
+apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects1} ->
{State, Effects, ok} = checkout(State0, Effects1),
@@ -251,7 +280,8 @@ apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
-apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0,
+apply(#{index := RaftIdx},
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
@@ -262,8 +292,8 @@ apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0,
_ ->
{State, Effects0, ok}
end;
-apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0,
- #state{consumers = Cons0} = State0) ->
+apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+ Effects0, #state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
{State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds,
@@ -274,7 +304,7 @@ apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0,
_ ->
{State0, Effects0, ok}
end;
-apply(_, {return, MsgIds, ConsumerId}, Effects0,
+apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
@@ -285,7 +315,8 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0,
_ ->
{State, Effects0, ok}
end;
-apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
+apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -328,25 +359,29 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
%% credit for unknown consumer - just ignore
{State0, Effects0, ok}
end;
-apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
+apply(_, #checkout{spec = {dequeue, _}}, Effects0,
#state{messages = M,
prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
%% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
-apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
+apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+ consumer_id = ConsumerId},
Effects0, State0) ->
% TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, {once, 1, simple_prefetch}, State0),
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
% turn send msg effect into reply
{success, _, MsgId, Msg, State2} = checkout_one(State1),
% immediately settle
- {State, Effects, _} = apply(Meta, {settle, [MsgId], ConsumerId},
+ {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]),
Effects0, State2),
{State, Effects, {dequeue, {MsgId, Msg}}};
-apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer},
+apply(_, #checkout{spec = {dequeue, unsettled},
+ meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
Effects0, State0) ->
- State1 = update_consumer(Consumer, {once, 1, simple_prefetch}, State0),
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
Effects1 = [{monitor, process, Pid} | Effects0],
{State, Reply, Effects} = case checkout_one(State1) of
{success, _, MsgId, Msg, S} ->
@@ -357,16 +392,17 @@ apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer},
{S, empty, Effects1}
end,
{State, Effects, {dequeue, Reply}};
-apply(_, {checkout, cancel, ConsumerId}, Effects0, State0) ->
+apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) ->
{CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}),
% TODO: here we should really demonitor the pid but _only_ if it has no
% other consumers or enqueuers.
checkout(State1, CancelEffects);
-apply(_, {checkout, Spec, {_Tag, Pid} = ConsumerId}, Effects0, State0) ->
- State1 = update_consumer(ConsumerId, Spec, State0),
+apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId},
+ Effects0, State0) ->
+ State1 = update_consumer(ConsumerId, Meta, Spec, State0),
{State, Effects, Res} = checkout(State1, Effects0),
{State, [{monitor, process, Pid} | Effects], Res};
-apply(#{index := RaftIdx}, purge, Effects0,
+apply(#{index := RaftIdx}, #purge{}, Effects0,
#state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
Total = rabbit_fifo_index:size(Indexes),
{State1, Effects1, _} =
@@ -470,7 +506,7 @@ apply(_, {nodeup, Node}, Effects0,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok};
-apply(_, {update_state, Conf}, Effects, State) ->
+apply(_, #update_state{config = Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
@@ -504,21 +540,17 @@ state_enter(_, _) ->
-spec tick(non_neg_integer(), state()) -> ra_machine:effects().
tick(_Ts, #state{name = Name,
+ queue_resource = QName,
messages = Messages,
ra_indexes = Indexes,
- metrics_handler = MH,
consumers = Cons} = State) ->
Metrics = {Name,
maps:size(Messages), % Ready
num_checked_out(State), % checked out
rabbit_fifo_index:size(Indexes), %% Total
maps:size(Cons)}, % Consumers
- case MH of
- undefined ->
- [{aux, emit}];
- {Mod, Fun, Args} ->
- [{mod_call, Mod, Fun, Args ++ [Metrics]}, {aux, emit}]
- end.
+ [{mod_call, rabbit_quorum_queue,
+ update_metrics, [QName, Metrics]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#state{consumers = Cons,
@@ -582,6 +614,14 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
query_consumer_count(#state{consumers = Consumers}) ->
maps:size(Consumers).
+query_consumers(#state{consumers = Consumers}) ->
+ maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)}
+ end, Consumers).
%% other
-spec usage(atom()) -> float().
@@ -627,11 +667,11 @@ num_checked_out(#state{consumers = Cons}) ->
end, 0, maps:values(Cons)).
cancel_consumer(ConsumerId,
- {Effects0, #state{consumers = C0, name = Name} = S0}) ->
+ {Effects0, #state{consumers = C0} = S0}) ->
case maps:take(ConsumerId, C0) of
{#consumer{checked_out = Checked0}, Cons} ->
S = return_all(S0, Checked0),
- Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0),
+ Effects = cancel_consumer_effects(ConsumerId, S, Effects0),
case maps:size(Cons) of
0 ->
{[{aux, inactive} | Effects], S#state{consumers = Cons}};
@@ -787,13 +827,9 @@ dead_letter_effects(Discarded,
end, [], Discarded),
[{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects].
-cancel_consumer_effects(_, _, #state{cancel_consumer_handler = undefined},
- Effects) ->
- Effects;
-cancel_consumer_effects(Pid, Name,
- #state{cancel_consumer_handler = {Mod, Fun, Args}},
- Effects) ->
- [{mod_call, Mod, Fun, Args ++ [Pid, Name]} | Effects].
+cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) ->
+ [{mod_call, rabbit_quorum_queue,
+ cancel_consumer_handler, [QName, ConsumerId]} | Effects].
update_smallest_raft_index(IncomingRaftIdx, OldIndexes,
#state{ra_indexes = Indexes,
@@ -1017,11 +1053,12 @@ uniq_queue_in(Key, Queue) ->
end.
-update_consumer(ConsumerId, {Life, Credit, Mode},
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
- Init = #consumer{lifetime = Life, credit = Credit, credit_mode = Mode},
+ Init = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
fun(S) ->
%% remove any in-flight messages from
@@ -1074,6 +1111,41 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
Con#consumer{checked_out = Checked}.
+-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
+make_enqueue(Pid, Seq, Msg) ->
+ #enqueue{pid = Pid, seq = Seq, msg = Msg}.
+-spec make_checkout(consumer_id(),
+ checkout_spec(), consumer_meta()) -> protocol().
+make_checkout(ConsumerId, Spec, Meta) ->
+ #checkout{consumer_id = ConsumerId,
+ spec = Spec, meta = Meta}.
+
+-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
+make_settle(ConsumerId, MsgIds) ->
+ #settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_return(consumer_id(), [msg_id()]) -> protocol().
+make_return(ConsumerId, MsgIds) ->
+ #return{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
+make_discard(ConsumerId, MsgIds) ->
+ #discard{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(),
+ boolean()) -> protocol().
+make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
+ #credit{consumer_id = ConsumerId,
+ credit = Credit,
+ delivery_count = DeliveryCount,
+ drain = Drain}.
+
+-spec make_purge() -> protocol().
+make_purge() -> #purge{}.
+
+-spec make_update_state(config()) -> protocol().
+make_update_state(Config) ->
+ #update_state{config = Config}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1098,18 +1170,17 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
test_init(Name) ->
init(#{name => Name,
- shadow_copy_interval => 0,
- metrics_handler => {?MODULE, metrics_handler, []}}).
-
-metrics_handler(_) ->
- ok.
+ queue_resource => queue_resource,
+ shadow_copy_interval => 0}).
enq_enq_checkout_test() ->
Cid = {<<"enq_enq_checkout_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
{_State3, Effects, _} =
- apply(meta(3), {checkout, {once, 2, simple_prefetch}, Cid}, [], State2),
+ apply(meta(3),
+ make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
+ [], State2),
?ASSERT_EFF({monitor, _, _}, Effects),
?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
ok.
@@ -1119,7 +1190,8 @@ credit_enq_enq_checkout_settled_credit_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
{State3, Effects, _} =
- apply(meta(3), {checkout, {auto, 1, credited}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}),
+ [], State2),
?ASSERT_EFF({monitor, _, _}, Effects),
Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
(_) -> false
@@ -1149,12 +1221,13 @@ credit_with_drained_test() ->
State0 = test_init(test),
%% checkout with a single credit
{State1, _, _} =
- apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0),
+ apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
+ [], State0),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
{State, _Effs, Result} =
- apply(meta(3), {credit, 0, 5, true, Cid}, [], State1),
+ apply(meta(3), make_credit(Cid, 0, 5, true), [], State1),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
@@ -1169,12 +1242,13 @@ credit_and_drain_test() ->
{State2, _} = enq(2, 2, second, State1),
%% checkout without any initial credit (like AMQP 1.0 would)
{State3, CheckEffs, _} =
- apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
+ [], State2),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
{State4, Effects, {multi, [{send_credit_reply, 0},
{send_drained, [{?FUNCTION_NAME, 2}]}]}} =
- apply(meta(4), {credit, 4, 0, true, Cid}, [], State3),
+ apply(meta(4), make_credit(Cid, 4, 0, true), [], State3),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -1193,7 +1267,8 @@ enq_enq_deq_test() ->
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
{_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State2),
ok.
enq_enq_deq_deq_settle_test() ->
@@ -1202,9 +1277,11 @@ enq_enq_deq_deq_settle_test() ->
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
{State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State2),
{_State4, _Effects4, {dequeue, empty}} =
- apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State3),
ok.
enq_enq_checkout_get_settled_test() ->
@@ -1212,23 +1289,28 @@ enq_enq_checkout_get_settled_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
% get returns a reply value
{_State2, _Effects, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
checkout_get_empty_test() ->
Cid = {?FUNCTION_NAME, self()},
State = test_init(test),
{_State2, [], {dequeue, empty}} =
- apply(meta(1), {checkout, {dequeue, unsettled}, Cid}, [], State),
+ apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State),
ok.
untracked_enq_deq_test() ->
Cid = {?FUNCTION_NAME, self()},
State0 = test_init(test),
- {State1, _, _} = apply(meta(1), {enqueue, undefined, undefined, first}, [], State0),
+ {State1, _, _} = apply(meta(1),
+ make_enqueue(undefined, undefined, first), [], State0),
{_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
+
release_cursor_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
@@ -1299,7 +1381,7 @@ return_non_existent_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
% return non-existent
- {_State2, [], _} = apply(meta(3), {return, [99], Cid}, [], State0),
+ {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0),
ok.
return_checked_out_test() ->
@@ -1309,7 +1391,8 @@ return_checked_out_test() ->
{send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} =
check(Cid, 2, State0),
% return
- {_State2, [_, _], _} = apply(meta(3), {return, [MsgId], Cid}, [], State1),
+ {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]),
+ [], State1),
ok.
return_auto_checked_out_test() ->
@@ -1322,7 +1405,7 @@ return_auto_checked_out_test() ->
{send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} =
check_auto(Cid, 2, State0),
% return should include another delivery
- {_State2, Effects, _} = apply(meta(3), {return, [MsgId], Cid}, [], State1),
+ {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1),
?ASSERT_EFF({send_msg, _,
{delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
Effects),
@@ -1335,19 +1418,22 @@ cancelled_checkout_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
- {State2, _, _} = apply(meta(3), {checkout, cancel, Cid}, [], State1),
+ {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}),
+ [], State1),
+ ?assertEqual(2, maps:size(State2#state.messages)),
{State3, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2),
+ ?debugFmt("State3 ~p", [State3]),
{_State, _, {dequeue, {_, {_, second}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
Cid = {<<"down_consumer_returns_unsettled_test">>, self()},
{State0, [_, _]} = enq(1, 1, second, test_init(test)),
{State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0),
- {State2, [_, _], _} = apply(meta(3), {down, Pid, noproc}, [], State1),
+ {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1),
{_State, Effects} = check(Cid, 4, State2),
?ASSERT_EFF({monitor, process, _}, Effects),
ok.
@@ -1416,7 +1502,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1),
+ {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -1425,6 +1511,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()},
State00 = init(#{name => test,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
dead_letter_handler =>
{somemod, somefun, [somearg]}}),
{State0, [_, _]} = enq(1, 1, first, State00),
@@ -1432,7 +1519,8 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1),
+ {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]),
+ [], State1),
% assert mod call effect with appended reason and message
?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
Effects2),
@@ -1445,25 +1533,23 @@ tick_test() ->
{S1, _} = enq(2, 2, snd, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3),
+ {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
- [{mod_call, _, _, [{test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
+ [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
ok.
enq_deq_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
- % OthPid = spawn(fun () -> ok end),
- % Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, settled}, Cid},
- {enqueue, self(), 3, three},
- {enqueue, self(), 4, four},
- {checkout, {dequeue, settled}, Cid},
- {enqueue, self(), 5, five},
- {checkout, {dequeue, settled}, Cid}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, settled}, #{}),
+ make_enqueue(self(), 3, three),
+ make_enqueue(self(), 4, four),
+ make_checkout(Cid, {dequeue, settled}, #{}),
+ make_enqueue(self(), 5, five),
+ make_checkout(Cid, {dequeue, settled}, #{})
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1473,10 +1559,10 @@ enq_deq_settle_snapshot_recover_test() ->
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Cid}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Cid, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1486,13 +1572,13 @@ enq_deq_settle_snapshot_recover_2_test() ->
OthPid = spawn(fun () -> ok end),
Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, two},
- {checkout, {dequeue, unsettled}, Oth},
- {settle, [0], Oth}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Oth, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1500,11 +1586,11 @@ snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {enqueue, self(), 3, three},
- purge
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_enqueue(self(), 3, three),
+ make_purge()
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1512,12 +1598,12 @@ enq_deq_return_settle_snapshot_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one}, %% to Cid
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {return, [0], Cid}, %% should be re-delivered to Cid
- {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
- {settle, [1], Cid},
- {settle, [2], Cid}
+ make_enqueue(self(), 1, one), %% to Cid
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_return(Cid, [0]), %% should be re-delivered to Cid
+ make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
+ make_settle(Cid, [1]),
+ make_settle(Cid, [2])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1525,10 +1611,10 @@ return_prefix_msg_count_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one},
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {checkout, cancel, Cid},
- {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ make_enqueue(self(), 1, one),
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_checkout(Cid, cancel, #{}),
+ make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
@@ -1541,16 +1627,16 @@ return_settle_snapshot_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one}, %% to Cid
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {return, [0], Cid}, %% should be re-delivered to Oth
- {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
- {settle, [1], Cid},
- {return, [2], Cid},
- {settle, [3], Cid},
- {enqueue, self(), 3, three},
- purge,
- {enqueue, self(), 4, four}
+ make_enqueue(self(), 1, one), %% to Cid
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_return(Cid, [0]), %% should be re-delivered to Oth
+ make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
+ make_settle(Cid, [1]),
+ make_return(Cid, [2]),
+ make_settle(Cid, [3]),
+ make_enqueue(self(), 3, three),
+ make_purge(),
+ make_enqueue(self(), 4, four)
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1558,13 +1644,13 @@ enq_check_settle_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {settle, [1], Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, three},
- {settle, [2], Cid}
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, three),
+ make_settle(Cid, [2])
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1573,13 +1659,13 @@ enq_check_settle_snapshot_purge_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {settle, [1], Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, three},
- purge
+ make_checkout(Cid, {auto, 2, simple_prefetch},#{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, three),
+ make_purge()
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1589,38 +1675,18 @@ enq_check_settle_duplicate_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one}, %% 0
- {enqueue, self(), 2, two}, %% 0
- {settle, [0], Cid},
- {settle, [1], Cid},
- {settle, [1], Cid},
- {enqueue, self(), 3, three},
- {settle, [2], Cid}
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one), %% 0
+ make_enqueue(self(), 2, two), %% 0
+ make_settle(Cid, [0]),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [1]),
+ make_enqueue(self(), 3, three),
+ make_settle(Cid, [2])
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
-
-multi_return_snapshot_test() ->
- %% this was discovered using property testing
- C1 = {<<>>, c:pid(0,6723,1)},
- C2 = {<<0>>,c:pid(0,6723,1)},
- E = c:pid(0,6720,1),
- Commands = [
- {checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg},
- {enqueue,E,2,msg},
- {checkout,cancel,C1}, %% both on returns queue
- {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
- {return,[0],C2}, %% E1 in returns, E2 with C2
- {return,[1],C2}, %% E2 in returns E1 with C2
- {settle,[2],C2} %% E2 with C2
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands),
- ok.
-
-
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
@@ -1657,11 +1723,11 @@ delivery_query_returns_deliveries_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 5, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {enqueue, self(), 3, tre},
- {enqueue, self(), 4, for}
+ make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_enqueue(self(), 3, tre),
+ make_enqueue(self(), 4, for)
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
@@ -1677,7 +1743,8 @@ pending_enqueue_is_enqueued_on_down_test() ->
{State0, _} = enq(1, 2, first, test_init(test)),
{State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0),
{_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
duplicate_delivery_test() ->
@@ -1690,6 +1757,7 @@ duplicate_delivery_test() ->
state_enter_test() ->
S0 = init(#{name => the_name,
+ queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>),
become_leader_handler => {m, f, [a]}}),
[{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0),
ok.
@@ -1721,11 +1789,11 @@ state_enter_montors_and_notifications_test() ->
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _, {purge, 1}} = apply(meta(2), purge, [], State1),
+ {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
{_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} =
- apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3),
ok.
purge_with_checkout_test() ->
@@ -1733,7 +1801,7 @@ purge_with_checkout_test() ->
{State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
{State1, _} = enq(2, 1, first, State0),
{State2, _} = enq(3, 2, second, State1),
- {State3, _, {purge, 2}} = apply(meta(2), purge, [], State2),
+ {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
?assertEqual(0, maps:size(Checked)),
ok.
@@ -1743,36 +1811,47 @@ meta(Idx) ->
enq(Idx, MsgSeq, Msg, State) ->
strip_reply(
- apply(meta(Idx), {enqueue, self(), MsgSeq, Msg}, [], State)).
+ apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)).
deq(Idx, Cid, Settlement, State0) ->
{State, _, {dequeue, Msg}} =
- apply(meta(Idx), {checkout, {dequeue, Settlement}, Cid}, [], State0),
+ apply(meta(Idx),
+ make_checkout(Cid, {dequeue, Settlement}, #{}),
+ [], State0),
{State, Msg}.
check_n(Cid, Idx, N, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {auto, N, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
+ [], State)).
check(Cid, Idx, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {once, 1, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
+ [], State)).
check_auto(Cid, Idx, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {auto, 1, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ [], State)).
check(Cid, Idx, Num, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {once, Num, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
+ [], State)).
settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), {settle, [MsgId], Cid}, [], State)).
+ strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)).
credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
- strip_reply(apply(meta(Idx), {credit, Credit, DelCnt, Drain, Cid}, [], State)).
+ strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
+ [], State)).
-strip_reply({State, Effects, _Replu}) ->
+strip_reply({State, Effects, _Reply}) ->
{State, Effects}.
run_log(InitState, Entries) ->
@@ -1789,7 +1868,9 @@ run_log(InitState, Entries) ->
aux_test() ->
_ = ra_machine_ets:start_link(),
Aux0 = init_aux(aux_test),
- MacState = init(#{name => aux_test}),
+ MacState = init(#{name => aux_test,
+ queue_resource =>
+ rabbit_misc:r(<<"/">>, queue, <<"test">>)}),
Log = undefined,
{no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0,
Log, MacState),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 635d85be4a..9cdb1dfbe7 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -24,8 +24,8 @@
init/2,
init/3,
init/5,
- checkout/3,
checkout/4,
+ checkout/5,
cancel_checkout/2,
enqueue/2,
enqueue/3,
@@ -42,6 +42,7 @@
]).
-include_lib("ra/include/ra.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
@@ -51,7 +52,7 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
--record(consumer, {last_msg_id :: seq(),
+-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
-record(state, {cluster_name :: ra_cluster_name(),
@@ -98,7 +99,7 @@ init(ClusterName, Servers) ->
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
-init(ClusterName, Servers, SoftLimit) ->
+init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
servers = Servers,
@@ -107,7 +108,7 @@ init(ClusterName, Servers, SoftLimit) ->
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
-init(ClusterName, Servers, SoftLimit, BlockFun, UnblockFun) ->
+init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
servers = Servers,
@@ -135,7 +136,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
Node = pick_node(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
- Cmd = {enqueue, self(), Next, Msg},
+ Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
case send_command(Node, Correlation, Cmd, low, State1) of
{slow, _} = Ret when not Slow ->
BlockFun(),
@@ -177,8 +178,11 @@ enqueue(Msg, State) ->
dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
Node = pick_node(State0),
ConsumerId = consumer_id(ConsumerTag),
- case ra:process_command(Node, {checkout, {dequeue, Settlement},
- ConsumerId}, Timeout) of
+ case ra:process_command(Node,
+ rabbit_fifo:make_checkout(ConsumerId,
+ {dequeue, Settlement},
+ #{}),
+ Timeout) of
{ok, {dequeue, Reply}, Leader} ->
{ok, Reply, State0#state{leader = Leader}};
Err ->
@@ -198,7 +202,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
- Cmd = {settle, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -232,7 +236,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
% TODO: make rabbit_fifo return support lists of message ids
- Cmd = {return, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -265,7 +269,7 @@ return(ConsumerTag, [_|_] = MsgIds,
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
- Cmd = {discard, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -299,9 +303,10 @@ discard(ConsumerTag, [_|_] = MsgIds,
%%
%% @returns `{ok, State}' or `{error | timeout, term()}'
-spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
+ rabbit_fifo:consumer_meta(),
state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, State0) ->
- checkout(ConsumerTag, NumUnsettled, simple_prefetch, State0).
+checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) ->
+ checkout(ConsumerTag, NumUnsettled, simple_prefetch, ConsumerInfo, State0).
%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
%% become available.
@@ -319,13 +324,17 @@ checkout(ConsumerTag, NumUnsettled, State0) ->
%% @param State The {@module} state.
%%
%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
+-spec checkout(rabbit_fifo:consumer_tag(),
+ NumUnsettled :: non_neg_integer(),
CreditMode :: rabbit_fifo:credit_mode(),
+ Meta :: rabbit_fifo:consumer_meta(),
state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, CreditMode, State0) ->
+checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, State0) ->
Servers = sorted_servers(State0),
ConsumerId = {ConsumerTag, self()},
- Cmd = {checkout, {auto, NumUnsettled, CreditMode}, ConsumerId},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId,
+ {auto, NumUnsettled, CreditMode},
+ Meta),
try_process_command(Servers, Cmd, State0).
%% @doc Provide credit to the queue
@@ -348,8 +357,8 @@ credit(ConsumerTag, Credit, Drain,
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
Node = pick_node(State0),
- Cmd = {credit, Credit, C#consumer.last_msg_id + 1, Drain, ConsumerId},
- ct:pal("sending credit ~w", [Cmd]),
+ Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
+ C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -372,7 +381,7 @@ credit(ConsumerTag, Credit, Drain,
cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
Servers = sorted_servers(State0),
ConsumerId = {ConsumerTag, self()},
- Cmd = {checkout, cancel, ConsumerId},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
try_process_command(Servers, Cmd, State).
@@ -380,7 +389,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
%% of messages purged.
-spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
purge(Node) ->
- case ra:process_command(Node, purge) of
+ case ra:process_command(Node, rabbit_fifo:make_purge()) of
{ok, {purge, Reply}, _} ->
{ok, Reply};
Err ->
@@ -393,7 +402,7 @@ cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
- case ra:process_command(Node, {update_state, Conf}) of
+ case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -520,7 +529,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) ->
-spec untracked_enqueue([ra_server_id()], term()) ->
ok.
untracked_enqueue([Node | _], Msg) ->
- Cmd = {enqueue, undefined, undefined, Msg},
+ Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
ok = ra:pipeline_command(Node, Cmd),
ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 5e854a4657..bb8af13b9d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -19,14 +19,14 @@
-export([init_state/2, handle_event/2]).
-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
--export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]).
-export([credit/4]).
-export([purge/1]).
-export([stateless_deliver/2, deliver/3]).
-export([dead_letter_publish/4]).
-export([queue_name/1]).
-export([cluster_state/1, status/2]).
--export([cancel_consumer_handler/3, cancel_consumer/3]).
+-export([cancel_consumer_handler/2, cancel_consumer/3]).
-export([become_leader/2, update_metrics/2]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
@@ -47,12 +47,9 @@
-spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
{'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
{rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
--spec declare(rabbit_types:amqqueue()) -> {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()}.
-spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() |
{'absent', rabbit_types:amqqueue(), atom()}].
-spec stop(rabbit_types:vhost()) -> 'ok'.
--spec delete(rabbit_types:amqqueue(), boolean(), boolean(), rabbit_types:username()) ->
- {'ok', QLen :: non_neg_integer()}.
-spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
@@ -61,15 +58,9 @@
rabbit_fifo_client:state()) ->
{'ok', 'empty', rabbit_fifo_client:state()} |
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
--spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
- ConsumerPrefetchCount :: non_neg_integer(), rabbit_types:ctag(),
- ExclusiveConsume :: boolean(), Args :: rabbit_framing:amqp_table(),
- any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
--spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) ->
- rabbit_fifo_client:state().
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
@@ -95,7 +86,7 @@
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
rabbit_fifo_client:state().
-init_state({Name, _}, QName) ->
+init_state({Name, _}, QName = #resource{}) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
@@ -104,13 +95,16 @@ init_state({Name, _}, QName) ->
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
+ rabbit_fifo_client:init(QName, Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
+-spec declare(rabbit_types:amqqueue()) ->
+ {'new', rabbit_types:amqqueue()} |
+ {existing, rabbit_types:amqqueue()}.
declare(#amqqueue{name = QName,
durable = Durable,
auto_delete = AutoDelete,
@@ -150,20 +144,17 @@ declare(#amqqueue{name = QName,
Ex
end.
-
-
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
ra_machine_config(Q = #amqqueue{name = QName}) ->
#{dead_letter_handler => dlx_mfa(Q),
- cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]},
+ queue_resource => QName,
become_leader_handler => {?MODULE, become_leader, [QName]},
metrics_handler => {?MODULE, update_metrics, [QName]}}.
-cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) ->
+cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
Node = node(ChPid),
- % QName = queue_name(Name),
case Node == node() of
true -> cancel_consumer(QName, ChPid, ConsumerTag);
false ->
@@ -247,9 +238,9 @@ recover(Queues) ->
RaNodes = [{Name, Node} || Node <- Nodes],
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
ok -> ok;
- Err ->
+ Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
- " be started ~w", [Name, Err]),
+ " be started ~w", [Name, Err2]),
ok
end;
{error, {already_started, _}} ->
@@ -275,7 +266,12 @@ stop(VHost) ->
_ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
ok.
-delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
+-spec delete(rabbit_types:amqqueue(),
+ boolean(), boolean(),
+ rabbit_types:username()) ->
+ {ok, QLen :: non_neg_integer()}.
+delete(#amqqueue{type = quorum, pid = {Name, _},
+ name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -345,17 +341,35 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
{error, timeout}
end.
+-spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
+ ConsumerPrefetchCount :: non_neg_integer(),
+ rabbit_types:ctag(), ExclusiveConsume :: boolean(),
+ Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
+ any(), rabbit_fifo_client:state()) ->
+ {'ok', rabbit_fifo_client:state()}.
basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg,
- QState0) ->
+ ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
+ ActingUser, OkMsg, QState0) ->
+ %% TODO: validate consumer arguments
+ %% currently quorum queues do not support any arguments
maybe_send_reply(ChPid, OkMsg),
- %% A prefetch count of 0 means no limitation, let's make it into something large for ra
+ ConsumerTag = quorum_ctag(ConsumerTag0),
+ %% A prefetch count of 0 means no limitation,
+ %% let's make it into something large for ra
Prefetch = case ConsumerPrefetchCount of
0 -> 2000;
Other -> Other
end,
- {ok, QState} = rabbit_fifo_client:checkout(quorum_ctag(ConsumerTag),
- Prefetch, QState0),
+ %% consumer info is used to describe the consumer properties
+ ConsumerMeta = #{ack => not NoAck,
+ prefetch => ConsumerPrefetchCount,
+ args => Args,
+ username => ActingUser},
+ {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag,
+ Prefetch,
+ ConsumerMeta,
+ QState0),
+ %% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount, Args),
@@ -369,6 +383,9 @@ stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
Delivery#delivery.message).
+-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
+ rabbit_fifo_client:state()) ->
+ {ok | slow, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
deliver(true, Delivery, QState0) ->
@@ -404,8 +421,8 @@ cleanup_data_dir() ->
<- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
- [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
- not lists:member(Name, Names)],
+ _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
+ not lists:member(Name, Names)],
ok.
maybe_delete_data_dir(UId) ->
diff --git a/src/truncate.erl b/src/truncate.erl
deleted file mode 100644
index 034bc62fa8..0000000000
--- a/src/truncate.erl
+++ /dev/null
@@ -1,127 +0,0 @@
-%% The contents of this file are subject to the Mozilla Public License
-%% Version 1.1 (the "License"); you may not use this file except in
-%% compliance with the License. You may obtain a copy of the License
-%% at http://www.mozilla.org/MPL/
-%%
-%% Software distributed under the License is distributed on an "AS IS"
-%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
-%% the License for the specific language governing rights and
-%% limitations under the License.
-%%
-%% The Original Code is RabbitMQ.
-%%
-%% The Initial Developer of the Original Code is GoPivotal, Inc.
-%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved.
-%%
-
--module(truncate).
-
--define(ELLIPSIS_LENGTH, 3).
-
--record(params, {content, struct, content_dec, struct_dec}).
-
--export([log_event/2, term/2]).
-
--ifdef(TEST).
--export([term_size/3]).
--endif.
-
-log_event({Type, GL, {Pid, Format, Args}}, Params)
- when Type =:= error orelse
- Type =:= info_msg orelse
- Type =:= warning_msg ->
- {Type, GL, {Pid, Format, [term(T, Params) || T <- Args]}};
-log_event({Type, GL, {Pid, ReportType, Report}}, Params)
- when Type =:= error_report orelse
- Type =:= info_report orelse
- Type =:= warning_report ->
- {Type, GL, {Pid, ReportType, report(Report, Params)}};
-log_event(Event, _Params) ->
- Event.
-
-report([[Thing]], Params) -> report([Thing], Params);
-report(List, Params) when is_list(List) -> [case Item of
- {K, V} -> {K, term(V, Params)};
- _ -> term(Item, Params)
- end || Item <- List];
-report(Other, Params) -> term(Other, Params).
-
-term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) ->
- case exceeds_size(Thing, Max) of
- true -> term(Thing, true, #params{content = Content,
- struct = Struct,
- content_dec = ContentDec,
- struct_dec = StructDec});
- false -> Thing
- end.
-
-term(Bin, _AllowPrintable, #params{content = N})
- when (is_binary(Bin) orelse is_bitstring(Bin))
- andalso size(Bin) > N - ?ELLIPSIS_LENGTH ->
- Suffix = without_ellipsis(N),
- <<Head:Suffix/binary, _/bitstring>> = Bin,
- <<Head/binary, <<"...">>/binary>>;
-term(L, AllowPrintable, #params{struct = N} = Params) when is_list(L) ->
- case AllowPrintable andalso io_lib:printable_list(L) of
- true -> N2 = without_ellipsis(N),
- case length(L) > N2 of
- true -> string:left(L, N2) ++ "...";
- false -> L
- end;
- false -> shrink_list(L, Params)
- end;
-term(T, _AllowPrintable, Params) when is_tuple(T) ->
- list_to_tuple(shrink_list(tuple_to_list(T), Params));
-term(T, _, _) ->
- T.
-
-without_ellipsis(N) -> erlang:max(N - ?ELLIPSIS_LENGTH, 0).
-
-shrink_list(_, #params{struct = N}) when N =< 0 ->
- ['...'];
-shrink_list([], _) ->
- [];
-shrink_list([H|T], #params{content = Content,
- struct = Struct,
- content_dec = ContentDec,
- struct_dec = StructDec} = Params) ->
- [term(H, true, Params#params{content = Content - ContentDec,
- struct = Struct - StructDec})
- | term(T, false, Params#params{struct = Struct - 1})].
-
-%%----------------------------------------------------------------------------
-
-%% We don't use erts_debug:flat_size/1 because that ignores binary
-%% sizes. This is all going to be rather approximate though, these
-%% sizes are probably not very "fair" but we are just trying to see if
-%% we reach a fairly arbitrary limit anyway though.
-exceeds_size(Thing, Max) ->
- case term_size(Thing, Max, erlang:system_info(wordsize)) of
- limit_exceeded -> true;
- _ -> false
- end.
-
-term_size(B, M, _W) when is_bitstring(B) -> lim(M, size(B));
-term_size(A, M, W) when is_atom(A) -> lim(M, 2 * W);
-term_size(N, M, W) when is_number(N) -> lim(M, 2 * W);
-term_size(T, M, W) when is_tuple(T) -> tuple_term_size(
- T, M, 1, tuple_size(T), W);
-term_size([], M, _W) ->
- M;
-term_size([H|T], M, W) ->
- case term_size(H, M, W) of
- limit_exceeded -> limit_exceeded;
- M2 -> lim(term_size(T, M2, W), 2 * W)
- end;
-term_size(X, M, W) ->
- lim(M, erts_debug:flat_size(X) * W).
-
-lim(S, T) when is_number(S) andalso S > T -> S - T;
-lim(_, _) -> limit_exceeded.
-
-tuple_term_size(_T, limit_exceeded, _I, _S, _W) ->
- limit_exceeded;
-tuple_term_size(_T, M, I, S, _W) when I > S ->
- M;
-tuple_term_size(T, M, I, S, W) ->
- tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W).
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 5b87c5be20..c94fb9ddab 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -709,6 +709,7 @@ subscribe(Config) ->
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ qos(Ch, 10, false),
RaName = ra_name(QQ),
publish(Ch, QQ),
wait_for_messages_ready(Servers, RaName, 1),
@@ -717,6 +718,14 @@ subscribe(Config) ->
receive_basic_deliver(false),
wait_for_messages_ready(Servers, RaName, 0),
wait_for_messages_pending_ack(Servers, RaName, 1),
+ %% validate we can retrieve the consumers
+ [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]),
+ ct:pal("Consumer ~p", [Consumer]),
+ ?assert(is_pid(proplists:get_value(channel_pid, Consumer))),
+ ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))),
+ ?assertEqual(true, proplists:get_value(ack_required, Consumer)),
+ ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)),
+ ?assertEqual([], proplists:get_value(arguments, Consumer)),
rabbit_ct_client_helpers:close_channel(Ch),
wait_for_messages_ready(Servers, RaName, 1),
wait_for_messages_pending_ack(Servers, RaName, 0).
@@ -1507,7 +1516,10 @@ basic_cancel(Config) ->
wait_for_messages_pending_ack(Servers, RaName, 1),
amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}),
wait_for_messages_ready(Servers, RaName, 1),
- wait_for_messages_pending_ack(Servers, RaName, 0)
+ wait_for_messages_pending_ack(Servers, RaName, 0),
+ [] = rpc:call(Server, ets, tab2list, [consumer_created])
+ after 5000 ->
+ exit(basic_deliver_timeout)
end.
purge(Config) ->
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index 56608e9af3..3263a733a9 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -4,6 +4,7 @@
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
all() ->
[
@@ -50,11 +51,16 @@ end_per_group(_, Config) ->
Config.
init_per_testcase(TestCase, Config) ->
+ meck:new(rabbit_quorum_queue, [passthrough]),
+ meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
+ fun (_, _) -> ok end),
ra_server_sup:remove_all(),
ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"),
ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"),
+ ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)),
[
- {cluster_name, TestCase},
+ {cluster_name, ClusterName},
{uid, atom_to_binary(TestCase, utf8)},
{node_id, {TestCase, node()}},
{uid2, atom_to_binary(ServerName2, utf8)},
@@ -63,6 +69,10 @@ init_per_testcase(TestCase, Config) ->
{node_id3, {ServerName3, node()}}
| Config].
+end_per_testcase(_, Config) ->
+ meck:unload(),
+ Config.
+
basics(Config) ->
ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
@@ -70,7 +80,7 @@ basics(Config) ->
CustomerTag = UId,
ok = start_cluster(ClusterName, [ServerId]),
FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0),
+ {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0),
ra_log_wal:force_roll_over(ra_log_wal),
% create segment the segment will trigger a snapshot
@@ -167,7 +177,7 @@ duplicate_delivery(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
Fun = fun Loop(S0) ->
receive
@@ -201,7 +211,7 @@ usage(Config) ->
ServerId = ?config(node_id, Config),
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1),
{ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2),
{_, _, _} = process_ra_events(F3, 50),
@@ -256,7 +266,7 @@ detects_lost_delivery(Config) ->
F000 = rabbit_fifo_client:init(ClusterName, [ServerId]),
{ok, F00} = rabbit_fifo_client:enqueue(msg1, F000),
{_, _, F0} = process_ra_events(F00, 100),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg2, F1),
{ok, F3} = rabbit_fifo_client:enqueue(msg3, F2),
% lose first delivery
@@ -285,7 +295,8 @@ returns_after_down(Config) ->
Self = self(),
_Pid = spawn(fun () ->
F = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10,
+ undefined, F),
Self ! checkout_done
end),
receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end,
@@ -327,8 +338,11 @@ handles_reject_notification(Config) ->
CId = {UId1, self()},
ok = start_cluster(ClusterName, [ServerId1, ServerId2]),
- _ = ra:process_command(ServerId1, {checkout,
- {auto, 10, simple_prefetch}, CId}),
+ _ = ra:process_command(ServerId1,
+ rabbit_fifo:make_checkout(
+ CId,
+ {auto, 10, simple_prefetch},
+ #{})),
% reverse order - should try the first node in the list first
F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]),
{ok, F1} = rabbit_fifo_client:enqueue(one, F0),
@@ -346,20 +360,21 @@ discard(Config) ->
ServerId = ?config(node_id, Config),
UId = ?config(uid, Config),
ClusterName = ?config(cluster_name, Config),
- Conf = #{cluster_name => ClusterName,
+ Conf = #{cluster_name => ClusterName#resource.name,
id => ServerId,
uid => UId,
log_init_args => #{data_dir => PrivDir, uid => UId},
initial_member => [],
machine => {module, rabbit_fifo,
- #{dead_letter_handler =>
+ #{queue_resource => discard,
+ dead_letter_handler =>
{?MODULE, dead_letter_handler, [self()]}}}},
_ = ra:start_server(Conf),
ok = ra:trigger_election(ServerId),
_ = ra:members(ServerId),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId]),
- {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0),
+ {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0),
{ok, F2} = rabbit_fifo_client:enqueue(msg1, F1),
F3 = discard_next_delivery(F2, 500),
{ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3),
@@ -380,7 +395,7 @@ cancel_checkout(Config) ->
ok = start_cluster(ClusterName, [ServerId]),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
{ok, F1} = rabbit_fifo_client:enqueue(m1, F0),
- {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1),
+ {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1),
{_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end),
{ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3),
{ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4),
@@ -395,7 +410,7 @@ credit(Config) ->
{ok, F2} = rabbit_fifo_client:enqueue(m2, F1),
{_, _, F3} = process_ra_events(F2, [], 250),
%% checkout with 0 prefetch
- {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3),
+ {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3),
%% assert no deliveries
{_, _, F5} = process_ra_events0(F4, [], [], 250,
fun
@@ -464,7 +479,7 @@ test_queries(Config) ->
receive stop -> ok end
end),
F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4),
- {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0),
+ {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0),
{ok, {_, Ready}, _} = ra:local_query(ServerId,
fun rabbit_fifo:query_messages_ready/1),
?assertEqual(1, maps:size(Ready)),
@@ -485,7 +500,7 @@ dead_letter_handler(Pid, Msgs) ->
Pid ! {dead_letter, Msgs}.
dequeue(Config) ->
- ClusterName = ?config(priv_dir, Config),
+ ClusterName = ?config(cluster_name, Config),
ServerId = ?config(node_id, Config),
UId = ?config(uid, Config),
Tag = UId,
@@ -611,14 +626,15 @@ validate_process_down(Name, Num) ->
end.
start_cluster(ClusterName, ServerIds, RaFifoConfig) ->
- {ok, Started, _} = ra:start_cluster(ClusterName,
+ {ok, Started, _} = ra:start_cluster(ClusterName#resource.name,
{module, rabbit_fifo, RaFifoConfig},
ServerIds),
?assertEqual(length(Started), length(ServerIds)),
ok.
start_cluster(ClusterName, ServerIds) ->
- start_cluster(ClusterName, ServerIds, #{}).
+ start_cluster(ClusterName, ServerIds, #{name => some_name,
+ queue_resource => ClusterName}).
flush() ->
receive
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 48e7b9aa7f..1c1ab42a9e 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -64,14 +64,14 @@ scenario1(_Config) ->
E = c:pid(0,6720,1),
Commands = [
- {checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {enqueue,E,2,msg2},
- {checkout,cancel,C1}, %% both on returns queue
- {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
- {return,[0],C2}, %% E1 in returns, E2 with C2
- {return,[1],C2}, %% E2 in returns E1 with C2
- {settle,[2],C2} %% E2 with C2
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_enqueue(E,2,msg2),
+ make_checkout(C1, cancel), %% both on returns queue
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_return(C2, [0]), %% E1 in returns, E2 with C2
+ make_return(C2, [1]), %% E2 in returns E1 with C2
+ make_settle(C2, [2]) %% E2 with C2
],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -80,13 +80,13 @@ scenario2(_Config) ->
C1 = {<<>>, c:pid(0,346,1)},
C2 = {<<>>,c:pid(0,379,1)},
E = c:pid(0,327,1),
- Commands = [{checkout,{auto,1,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {checkout,cancel,C1},
- {enqueue,E,2,msg2},
- {checkout,{auto,1,simple_prefetch},C2},
- {settle,[0],C1},
- {settle,[0],C2}
+ Commands = [make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_checkout(C1, cancel),
+ make_enqueue(E,2,msg2),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_settle(C1, [0]),
+ make_settle(C2, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -94,22 +94,24 @@ scenario2(_Config) ->
scenario3(_Config) ->
C1 = {<<>>, c:pid(0,179,1)},
E = c:pid(0,176,1),
- Commands = [{checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg1},
- {return,[0],C1},
- {enqueue,E,2,msg2},
- {enqueue,E,3,msg3},
- {settle,[1],C1},
- {settle,[2],C1}],
+ Commands = [make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E,1,msg1),
+ make_return(C1, [0]),
+ make_enqueue(E,2,msg2),
+ make_enqueue(E,3,msg3),
+ make_settle(C1, [1]),
+ make_settle(C1, [2])
+ ],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
scenario4(_Config) ->
C1 = {<<>>, c:pid(0,179,1)},
E = c:pid(0,176,1),
-Commands = [{checkout,{auto,1,simple_prefetch},C1},
- {enqueue,E,1,msg},
- {settle,[0],C1}],
+ Commands = [make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,msg),
+ make_settle(C1, [0])
+ ],
run_snapshot_test(?FUNCTION_NAME, Commands),
ok.
@@ -167,6 +169,7 @@ checkout_gen(Pid) ->
-record(t, {state = rabbit_fifo:init(#{name => proper,
+ queue_resource => blah,
shadow_copy_interval => 1})
:: rabbit_fifo:state(),
index = 1 :: non_neg_integer(), %% raft index
@@ -201,7 +204,7 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0,
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = {enqueue, Pid, MsgSeq, msg},
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg),
case When of
enqueue ->
do_apply(Cmd, T#t{enqueuers = Enqs});
@@ -218,7 +221,7 @@ handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
end, Cons0)) of
[CId | _] ->
Cons = maps:remove(CId, Cons0),
- Cmd = {checkout, cancel, CId},
+ Cmd = rabbit_fifo:make_checkout(CId, cancel, #{}),
do_apply(Cmd, T#t{consumers = Cons});
_ ->
T
@@ -230,7 +233,13 @@ handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) ->
T;
_ ->
Cons = maps:put(CId, ok, Cons0),
- Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId},
+ Cmd = rabbit_fifo:make_checkout(CId,
+ {auto, Prefetch, simple_prefetch},
+ #{ack => true,
+ prefetch => Prefetch,
+ username => <<"user">>,
+ args => []}),
+
do_apply(Cmd, T#t{consumers = Cons})
end;
handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) ->
@@ -253,14 +262,19 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) ->
handle_op({input_event, Settlement}, #t{effects = Effs} = T) ->
case queue:out(Effs) of
{{value, {settle, MsgIds, CId}}, Q} ->
- do_apply({Settlement, MsgIds, CId}, T#t{effects = Q});
- {{value, {enqueue, _, _, _} = Cmd}, Q} ->
+ Cmd = case Settlement of
+ settle -> rabbit_fifo:make_settle(CId, MsgIds);
+ return -> rabbit_fifo:make_return(CId, MsgIds);
+ discard -> rabbit_fifo:make_discard(CId, MsgIds)
+ end,
+ do_apply(Cmd, T#t{effects = Q});
+ {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
do_apply(Cmd, T#t{effects = Q});
_ ->
T
end;
handle_op(purge, T) ->
- do_apply(purge, T).
+ do_apply(rabbit_fifo:make_purge(), T).
do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
log = Log} = T) ->
@@ -275,7 +289,7 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
MsgIds = [I || {I, _} <- Msgs],
%% always make settle commands by default
%% they can be changed depending on the input event later
- Cmd = {settle, MsgIds, {CTag, P}},
+ Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds),
enq_effs(Rem, queue:in(Cmd, Q));
enq_effs([_ | Rem], Q) ->
% ct:pal("enq_effs dropping ~w~n", [E]),
@@ -310,18 +324,8 @@ run_snapshot_test0(Name, Commands) ->
Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true;
(_) -> false
end, Entries),
- % L = case Filtered of
- % [] -> undefined;
- % _ ->lists:last(Filtered)
- % end,
-
- % ct:pal("running from snapshot: ~b to ~w"
- % "~n~p~n",
- % [SnapIdx, L, SnapState]),
{S, _} = run_log(SnapState, Filtered),
% assert log can be restored from any release cursor index
- % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n",
- % [Name, SnapIdx, S, State, SnapState, Filtered]),
?assertEqual(State, S)
end || {release_cursor, SnapIdx, SnapState} <- Effects],
ok.
@@ -342,7 +346,20 @@ run_log(InitState, Entries) ->
test_init(Name) ->
rabbit_fifo:init(#{name => Name,
+ queue_resource => blah,
shadow_copy_interval => 0,
metrics_handler => {?MODULE, metrics_handler, []}}).
meta(Idx) ->
#{index => Idx, term => 1}.
+
+make_checkout(Cid, Spec) ->
+ rabbit_fifo:make_checkout(Cid, Spec, #{}).
+
+make_enqueue(Pid, Seq, Msg) ->
+ rabbit_fifo:make_enqueue(Pid, Seq, Msg).
+
+make_settle(Cid, MsgIds) ->
+ rabbit_fifo:make_settle(Cid, MsgIds).
+
+make_return(Cid, MsgIds) ->
+ rabbit_fifo:make_return(Cid, MsgIds).
diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl
index be9c8d8698..f74e18afb9 100644
--- a/test/unit_SUITE.erl
+++ b/test/unit_SUITE.erl
@@ -57,11 +57,6 @@ groups() ->
check_shutdown_ignored
]},
table_codec,
- {truncate, [parallel], [
- short_examples_exactly,
- term_limit,
- large_examples_for_size
- ]},
unfold,
{vm_memory_monitor, [parallel], [
parse_line_linux
@@ -698,71 +693,6 @@ check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) ->
Res.
%% ---------------------------------------------------------------------------
-%% truncate.
-%% ---------------------------------------------------------------------------
-
-short_examples_exactly(_Config) ->
- F = fun (Term, Exp) ->
- Exp = truncate:term(Term, {1, {10, 10, 5, 5}}),
- Term = truncate:term(Term, {100000, {10, 10, 5, 5}})
- end,
- FSmall = fun (Term, Exp) ->
- Exp = truncate:term(Term, {1, {2, 2, 2, 2}}),
- Term = truncate:term(Term, {100000, {2, 2, 2, 2}})
- end,
- F([], []),
- F("h", "h"),
- F("hello world", "hello w..."),
- F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]),
- F([a|b], [a|b]),
- F(<<"hello">>, <<"hello">>),
- F([<<"hello world">>], [<<"he...">>]),
- F(<<1:1>>, <<1:1>>),
- F(<<1:81>>, <<0:56, "...">>),
- F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}),
- FSmall({a,30,40,40,40,40}, {a,30,'...'}),
- FSmall([a,30,40,40,40,40], [a,30,'...']),
- P = spawn(fun() -> receive die -> ok end end),
- F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]),
- P ! die,
- R = make_ref(),
- F([R], [R]),
- ok.
-
-term_limit(_Config) ->
- W = erlang:system_info(wordsize),
- S = <<"abc">>,
- 1 = truncate:term_size(S, 4, W),
- limit_exceeded = truncate:term_size(S, 3, W),
- case 100 - truncate:term_size([S, S], 100, W) of
- 22 -> ok; %% 32 bit
- 38 -> ok %% 64 bit
- end,
- case 100 - truncate:term_size([S, [S]], 100, W) of
- 30 -> ok; %% ditto
- 54 -> ok
- end,
- limit_exceeded = truncate:term_size([S, S], 6, W),
- ok.
-
-large_examples_for_size(_Config) ->
- %% Real world values
- Shrink = fun(Term) -> truncate:term(Term, {1, {1000, 100, 50, 5}}) end,
- TestSize = fun(Term) ->
- true = 5000000 < size(term_to_binary(Term)),
- true = 500000 > size(term_to_binary(Shrink(Term)))
- end,
- TestSize(lists:seq(1, 5000000)),
- TestSize(recursive_list(1000, 10)),
- TestSize(recursive_list(5000, 20)),
- TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])),
- TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])),
- ok.
-
-recursive_list(S, 0) -> lists:seq(1, S);
-recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)].
-
-%% ---------------------------------------------------------------------------
%% vm_memory_monitor.
%% ---------------------------------------------------------------------------