diff options
| author | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 14:12:27 +0100 |
|---|---|---|
| committer | Arnaud Cogoluègnes <acogoluegnes@gmail.com> | 2018-12-14 14:12:27 +0100 |
| commit | 6bec963773f5687c9fa24a7ce902d2bc9fa272c2 (patch) | |
| tree | cbb0d0ed9fbe0bf8275f1f02bd6375c4565bec8f | |
| parent | 8317600c5f65b42149827849f5c5fcaf766a5512 (diff) | |
| parent | 685f8ea30c0654f9264ec3b25a9ccceb1a2e9a3e (diff) | |
| download | rabbitmq-server-git-6bec963773f5687c9fa24a7ce902d2bc9fa272c2.tar.gz | |
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
| -rw-r--r-- | src/lqueue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 497 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 73 | ||||
| -rw-r--r-- | src/truncate.erl | 127 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 14 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 50 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 101 | ||||
| -rw-r--r-- | test/unit_SUITE.erl | 70 |
10 files changed, 509 insertions, 542 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 1abe4e0b82..2b75ef4856 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -25,27 +25,32 @@ -define(QUEUE, queue). --export_type([?MODULE/0]). +-export_type([ + ?MODULE/0, + ?MODULE/1 + ]). --opaque ?MODULE() :: {non_neg_integer(), queue:queue()}. +-opaque ?MODULE() :: {non_neg_integer(), queue:queue(term())}. +-opaque ?MODULE(T) :: {non_neg_integer(), queue:queue(T)}. -type value() :: any(). --type result() :: 'empty' | {'value', value()}. +-type result(T) :: 'empty' | {'value', T}. --spec new() -> ?MODULE(). --spec drop(?MODULE()) -> ?MODULE(). --spec is_empty(?MODULE()) -> boolean(). --spec len(?MODULE()) -> non_neg_integer(). --spec in(value(), ?MODULE()) -> ?MODULE(). +-spec new() -> ?MODULE(_). +-spec drop(?MODULE(T)) -> ?MODULE(T). +-spec is_empty(?MODULE(_)) -> boolean(). +-spec len(?MODULE(_)) -> non_neg_integer(). +-spec in(T, ?MODULE(T)) -> ?MODULE(T). -spec in_r(value(), ?MODULE()) -> ?MODULE(). --spec out(?MODULE()) -> {result(), ?MODULE()}. --spec out_r(?MODULE()) -> {result(), ?MODULE()}. --spec join(?MODULE(), ?MODULE()) -> ?MODULE(). --spec foldl(fun ((value(), B) -> B), B, ?MODULE()) -> B. --spec foldr(fun ((value(), B) -> B), B, ?MODULE()) -> B. --spec from_list([value()]) -> ?MODULE(). --spec to_list(?MODULE()) -> [value()]. --spec peek(?MODULE()) -> result(). --spec peek_r(?MODULE()) -> result(). +-spec out(?MODULE(T)) -> {result(T), ?MODULE()}. +-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}. +-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B). +-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B. +-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B. +-spec from_list([T]) -> ?MODULE(T). +-spec to_list(?MODULE(T)) -> [T]. +% -spec peek(?MODULE()) -> result(). +-spec peek(?MODULE(T)) -> result(T). +-spec peek_r(?MODULE(T)) -> result(T). new() -> {0, ?QUEUE:new()}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 0f7d569a00..aa9889aef6 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -926,8 +926,12 @@ notify_policy_changed(#amqqueue{pid = QPid, name = QName}) when ?IS_QUORUM(QPid) -> rabbit_quorum_queue:policy_changed(QName, QPid). -consumers(#amqqueue{ pid = QPid }) -> - delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). +consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); +consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) -> + {ok, {_, Result}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_consumers/1), + maps:values(Result). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -1154,14 +1158,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP [rabbit_misc:rs(QName), Reason]) end. -basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerPrefetchCount, ConsumerTag, +basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, + LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QState) -> ok = check_consume_arguments(QName, Args), - case delegate:invoke(QPid, {gen_server2, call, - [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, - Args, OkMsg, ActingUser}, infinity]}) of + case delegate:invoke(QPid, + {gen_server2, call, + [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg, ActingUser}, infinity]}) of ok -> {ok, QState}; Err -> @@ -1171,15 +1176,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid, _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag, _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) -> {error, global_qos_not_supported_for_queue_type}; -basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, NoAck, ChPid, - _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag, - ExclusiveConsume, Args, OkMsg, _ActingUser, QStates) -> +basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, + NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount, + ConsumerTag, ExclusiveConsume, Args, OkMsg, + ActingUser, QStates) -> ok = check_consume_arguments(QName, Args), QState0 = get_quorum_state(Id, QName, QStates), {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, + ActingUser, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 00d0db0b8a..2d5c267227 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -39,18 +39,23 @@ query_processes/1, query_ra_indexes/1, query_consumer_count/1, + query_consumers/1, usage/1, %% misc - dehydrate_state/1 + dehydrate_state/1, + + %% protocol helpers + make_enqueue/3, + make_checkout/3, + make_settle/2, + make_return/2, + make_discard/2, + make_credit/4, + make_purge/0, + make_update_state/1 ]). --ifdef(TEST). --export([ - metrics_handler/1 - ]). --endif. - -type raw_msg() :: term(). %% The raw message. It is opaque to rabbit_fifo. @@ -99,19 +104,43 @@ {dequeue, settled | unsettled} | cancel. --type protocol() :: - {enqueue, Sender :: maybe(pid()), MsgSeq :: maybe(msg_seqno()), - Msg :: raw_msg()} | - {checkout, Spec :: checkout_spec(), Consumer :: consumer_id()} | - {settle, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {return, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {discard, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {credit, - Credit :: non_neg_integer(), - DeliveryCount :: non_neg_integer(), - Drain :: boolean(), - Consumer :: consumer_id()} | - purge. +-type consumer_meta() :: #{ack => boolean(), + username => binary(), + prefetch => non_neg_integer(), + args => list()}. +%% static meta data associated with a consumer + +%% command records representing all the protocol actions that are supported +-record(enqueue, {pid :: maybe(pid()), + seq :: maybe(msg_seqno()), + msg :: raw_msg()}). +-record(checkout, {consumer_id :: consumer_id(), + spec :: checkout_spec(), + meta :: consumer_meta()}). +-record(settle, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(return, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(discard, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(credit, {consumer_id :: consumer_id(), + credit :: non_neg_integer(), + delivery_count :: non_neg_integer(), + drain :: boolean()}). +-record(purge, {}). +-record(update_state, {config :: config()}). + + + +-opaque protocol() :: + #enqueue{} | + #checkout{} | + #settle{} | + #return{} | + #discard{} | + #credit{} | + #purge{} | + #update_state{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -126,7 +155,8 @@ -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, - {checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, + {meta = #{} :: consumer_meta(), + checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, next_msg_id = 0 :: msg_id(), % part of snapshot data %% max number of messages that can be sent %% decremented for each delivery @@ -153,6 +183,7 @@ -record(state, {name :: atom(), + queue_resource :: rabbit_types:r('queue'), shadow_copy_interval = ?SHADOW_COPY_INTERVAL :: non_neg_integer(), % unassigned messages messages = #{} :: #{msg_in_id() => indexed_msg()}, @@ -163,7 +194,7 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'), + returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -183,9 +214,7 @@ % needs to be part of snapshot service_queue = queue:new() :: queue:queue(consumer_id()), dead_letter_handler :: maybe(applied_mfa()), - cancel_consumer_handler :: maybe(applied_mfa()), become_leader_handler :: maybe(applied_mfa()), - metrics_handler :: maybe(applied_mfa()), %% This is a special field that is only used for snapshots %% It represents the number of queued messages at the time the %% dehydrated snapshot state was cached. @@ -203,16 +232,17 @@ -opaque state() :: #state{}. -type config() :: #{name := atom(), + queue_resource := rabbit_types:r('queue'), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), - cancel_consumer_handler => applied_mfa(), - metrics_handler => applied_mfa(), shadow_copy_interval => non_neg_integer()}. -export_type([protocol/0, delivery/0, command/0, + credit_mode/0, consumer_tag/0, + consumer_meta/0, consumer_id/0, client_msg/0, msg/0, @@ -222,20 +252,18 @@ state/0, config/0]). --spec init(config()) -> {state(), ra_machine:effects()}. -init(#{name := Name} = Conf) -> - update_state(Conf, #state{name = Name}). +-spec init(config()) -> state(). +init(#{name := Name, + queue_resource := Resource} = Conf) -> + update_state(Conf, #state{name = Name, + queue_resource = Resource}). update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), - CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), - MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), State#state{dead_letter_handler = DLH, - cancel_consumer_handler = CCH, become_leader_handler = BLH, - metrics_handler = MH, shadow_copy_interval = SHI}. % msg_ids are scoped per consumer @@ -243,7 +271,8 @@ update_state(Conf, State) -> -spec apply(ra_machine:command_meta_data(), command(), ra_machine:effects(), state()) -> {state(), ra_machine:effects(), Reply :: term()}. -apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> +apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of {ok, State0, Effects1} -> {State, Effects, ok} = checkout(State0, Effects1), @@ -251,7 +280,8 @@ apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> {duplicate, State, Effects} -> {State, Effects, ok} end; -apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0, +apply(#{index := RaftIdx}, + #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> @@ -262,8 +292,8 @@ apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0, _ -> {State, Effects0, ok} end; -apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0, - #state{consumers = Cons0} = State0) -> +apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, + Effects0, #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds, @@ -274,7 +304,7 @@ apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0, _ -> {State0, Effects0, ok} end; -apply(_, {return, MsgIds, ConsumerId}, Effects0, +apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> @@ -285,7 +315,8 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0, _ -> {State, Effects0, ok} end; -apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, +apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, + drain = Drain, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -328,25 +359,29 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, %% credit for unknown consumer - just ignore {State0, Effects0, ok} end; -apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, +apply(_, #checkout{spec = {dequeue, _}}, Effects0, #state{messages = M, prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; -apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, +apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, + consumer_id = ConsumerId}, Effects0, State0) -> % TODO: this clause could probably be optimised - State1 = update_consumer(ConsumerId, {once, 1, simple_prefetch}, State0), + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), % turn send msg effect into reply {success, _, MsgId, Msg, State2} = checkout_one(State1), % immediately settle - {State, Effects, _} = apply(Meta, {settle, [MsgId], ConsumerId}, + {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]), Effects0, State2), {State, Effects, {dequeue, {MsgId, Msg}}}; -apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer}, +apply(_, #checkout{spec = {dequeue, unsettled}, + meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, Effects0, State0) -> - State1 = update_consumer(Consumer, {once, 1, simple_prefetch}, State0), + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), Effects1 = [{monitor, process, Pid} | Effects0], {State, Reply, Effects} = case checkout_one(State1) of {success, _, MsgId, Msg, S} -> @@ -357,16 +392,17 @@ apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer}, {S, empty, Effects1} end, {State, Effects, {dequeue, Reply}}; -apply(_, {checkout, cancel, ConsumerId}, Effects0, State0) -> +apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) -> {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}), % TODO: here we should really demonitor the pid but _only_ if it has no % other consumers or enqueuers. checkout(State1, CancelEffects); -apply(_, {checkout, Spec, {_Tag, Pid} = ConsumerId}, Effects0, State0) -> - State1 = update_consumer(ConsumerId, Spec, State0), +apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId}, + Effects0, State0) -> + State1 = update_consumer(ConsumerId, Meta, Spec, State0), {State, Effects, Res} = checkout(State1, Effects0), {State, [{monitor, process, Pid} | Effects], Res}; -apply(#{index := RaftIdx}, purge, Effects0, +apply(#{index := RaftIdx}, #purge{}, Effects0, #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> Total = rabbit_fifo_index:size(Indexes), {State1, Effects1, _} = @@ -470,7 +506,7 @@ apply(_, {nodeup, Node}, Effects0, service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}; -apply(_, {update_state, Conf}, Effects, State) -> +apply(_, #update_state{config = Conf}, Effects, State) -> {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). @@ -504,21 +540,17 @@ state_enter(_, _) -> -spec tick(non_neg_integer(), state()) -> ra_machine:effects(). tick(_Ts, #state{name = Name, + queue_resource = QName, messages = Messages, ra_indexes = Indexes, - metrics_handler = MH, consumers = Cons} = State) -> Metrics = {Name, maps:size(Messages), % Ready num_checked_out(State), % checked out rabbit_fifo_index:size(Indexes), %% Total maps:size(Cons)}, % Consumers - case MH of - undefined -> - [{aux, emit}]; - {Mod, Fun, Args} -> - [{mod_call, Mod, Fun, Args ++ [Metrics]}, {aux, emit}] - end. + [{mod_call, rabbit_quorum_queue, + update_metrics, [QName, Metrics]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#state{consumers = Cons, @@ -582,6 +614,14 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) -> query_consumer_count(#state{consumers = Consumers}) -> maps:size(Consumers). +query_consumers(#state{consumers = Consumers}) -> + maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)} + end, Consumers). %% other -spec usage(atom()) -> float(). @@ -627,11 +667,11 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - {Effects0, #state{consumers = C0, name = Name} = S0}) -> + {Effects0, #state{consumers = C0} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), + Effects = cancel_consumer_effects(ConsumerId, S, Effects0), case maps:size(Cons) of 0 -> {[{aux, inactive} | Effects], S#state{consumers = Cons}}; @@ -787,13 +827,9 @@ dead_letter_effects(Discarded, end, [], Discarded), [{mod_call, Mod, Fun, Args ++ [DeadLetters]} | Effects]. -cancel_consumer_effects(_, _, #state{cancel_consumer_handler = undefined}, - Effects) -> - Effects; -cancel_consumer_effects(Pid, Name, - #state{cancel_consumer_handler = {Mod, Fun, Args}}, - Effects) -> - [{mod_call, Mod, Fun, Args ++ [Pid, Name]} | Effects]. +cancel_consumer_effects(ConsumerId, #state{queue_resource = QName}, Effects) -> + [{mod_call, rabbit_quorum_queue, + cancel_consumer_handler, [QName, ConsumerId]} | Effects]. update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, @@ -1017,11 +1053,12 @@ uniq_queue_in(Key, Queue) -> end. -update_consumer(ConsumerId, {Life, Credit, Mode}, +update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer - Init = #consumer{lifetime = Life, credit = Credit, credit_mode = Mode}, + Init = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, Cons = maps:update_with(ConsumerId, fun(S) -> %% remove any in-flight messages from @@ -1074,6 +1111,41 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), Con#consumer{checked_out = Checked}. +-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol(). +make_enqueue(Pid, Seq, Msg) -> + #enqueue{pid = Pid, seq = Seq, msg = Msg}. +-spec make_checkout(consumer_id(), + checkout_spec(), consumer_meta()) -> protocol(). +make_checkout(ConsumerId, Spec, Meta) -> + #checkout{consumer_id = ConsumerId, + spec = Spec, meta = Meta}. + +-spec make_settle(consumer_id(), [msg_id()]) -> protocol(). +make_settle(ConsumerId, MsgIds) -> + #settle{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_return(consumer_id(), [msg_id()]) -> protocol(). +make_return(ConsumerId, MsgIds) -> + #return{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_discard(consumer_id(), [msg_id()]) -> protocol(). +make_discard(ConsumerId, MsgIds) -> + #discard{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(), + boolean()) -> protocol(). +make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> + #credit{consumer_id = ConsumerId, + credit = Credit, + delivery_count = DeliveryCount, + drain = Drain}. + +-spec make_purge() -> protocol(). +make_purge() -> #purge{}. + +-spec make_update_state(config()) -> protocol(). +make_update_state(Config) -> + #update_state{config = Config}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1098,18 +1170,17 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> test_init(Name) -> init(#{name => Name, - shadow_copy_interval => 0, - metrics_handler => {?MODULE, metrics_handler, []}}). - -metrics_handler(_) -> - ok. + queue_resource => queue_resource, + shadow_copy_interval => 0}). enq_enq_checkout_test() -> Cid = {<<"enq_enq_checkout_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), {_State3, Effects, _} = - apply(meta(3), {checkout, {once, 2, simple_prefetch}, Cid}, [], State2), + apply(meta(3), + make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + [], State2), ?ASSERT_EFF({monitor, _, _}, Effects), ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), ok. @@ -1119,7 +1190,8 @@ credit_enq_enq_checkout_settled_credit_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), {State3, Effects, _} = - apply(meta(3), {checkout, {auto, 1, credited}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), + [], State2), ?ASSERT_EFF({monitor, _, _}, Effects), Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; (_) -> false @@ -1149,12 +1221,13 @@ credit_with_drained_test() -> State0 = test_init(test), %% checkout with a single credit {State1, _, _} = - apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0), + apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}), + [], State0), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, delivery_count = 0}}}, State1), {State, _Effs, Result} = - apply(meta(3), {credit, 0, 5, true, Cid}, [], State1), + apply(meta(3), make_credit(Cid, 0, 5, true), [], State1), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), @@ -1169,12 +1242,13 @@ credit_and_drain_test() -> {State2, _} = enq(2, 2, second, State1), %% checkout without any initial credit (like AMQP 1.0 would) {State3, CheckEffs, _} = - apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}), + [], State2), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), {State4, Effects, {multi, [{send_credit_reply, 0}, {send_drained, [{?FUNCTION_NAME, 2}]}]}} = - apply(meta(4), {credit, 4, 0, true, Cid}, [], State3), + apply(meta(4), make_credit(Cid, 4, 0, true), [], State3), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), @@ -1193,7 +1267,8 @@ enq_enq_deq_test() -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State2), ok. enq_enq_deq_deq_settle_test() -> @@ -1202,9 +1277,11 @@ enq_enq_deq_deq_settle_test() -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State2), {_State4, _Effects4, {dequeue, empty}} = - apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State3), ok. enq_enq_checkout_get_settled_test() -> @@ -1212,23 +1289,28 @@ enq_enq_checkout_get_settled_test() -> {State1, _} = enq(1, 1, first, test_init(test)), % get returns a reply value {_State2, _Effects, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. checkout_get_empty_test() -> Cid = {?FUNCTION_NAME, self()}, State = test_init(test), {_State2, [], {dequeue, empty}} = - apply(meta(1), {checkout, {dequeue, unsettled}, Cid}, [], State), + apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State), ok. untracked_enq_deq_test() -> Cid = {?FUNCTION_NAME, self()}, State0 = test_init(test), - {State1, _, _} = apply(meta(1), {enqueue, undefined, undefined, first}, [], State0), + {State1, _, _} = apply(meta(1), + make_enqueue(undefined, undefined, first), [], State0), {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. + release_cursor_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), @@ -1299,7 +1381,7 @@ return_non_existent_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), % return non-existent - {_State2, [], _} = apply(meta(3), {return, [99], Cid}, [], State0), + {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0), ok. return_checked_out_test() -> @@ -1309,7 +1391,8 @@ return_checked_out_test() -> {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} = check(Cid, 2, State0), % return - {_State2, [_, _], _} = apply(meta(3), {return, [MsgId], Cid}, [], State1), + {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]), + [], State1), ok. return_auto_checked_out_test() -> @@ -1322,7 +1405,7 @@ return_auto_checked_out_test() -> {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} = check_auto(Cid, 2, State0), % return should include another delivery - {_State2, Effects, _} = apply(meta(3), {return, [MsgId], Cid}, [], State1), + {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, Effects), @@ -1335,19 +1418,22 @@ cancelled_checkout_out_test() -> {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue - {State2, _, _} = apply(meta(3), {checkout, cancel, Cid}, [], State1), + {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), + [], State1), + ?assertEqual(2, maps:size(State2#state.messages)), {State3, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2), + ?debugFmt("State3 ~p", [State3]), {_State, _, {dequeue, {_, {_, second}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3), ok. down_with_noproc_consumer_returns_unsettled_test() -> Cid = {<<"down_consumer_returns_unsettled_test">>, self()}, {State0, [_, _]} = enq(1, 1, second, test_init(test)), {State1, [{monitor, process, Pid} | _]} = check(Cid, 2, State0), - {State2, [_, _], _} = apply(meta(3), {down, Pid, noproc}, [], State1), + {State2, _, _} = apply(meta(3), {down, Pid, noproc}, [], State1), {_State, Effects} = check(Cid, 4, State2), ?ASSERT_EFF({monitor, process, _}, Effects), ok. @@ -1416,7 +1502,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1), + {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -1425,6 +1511,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, State00 = init(#{name => test, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), dead_letter_handler => {somemod, somefun, [somearg]}}), {State0, [_, _]} = enq(1, 1, first, State00), @@ -1432,7 +1519,8 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1), + {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), + [], State1), % assert mod call effect with appended reason and message ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, Effects2), @@ -1445,25 +1533,23 @@ tick_test() -> {S1, _} = enq(2, 2, snd, S0), {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), {S3, {_, _}} = deq(4, Cid2, unsettled, S2), - {S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3), + {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3), - [{mod_call, _, _, [{test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), + [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), ok. enq_deq_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - % OthPid = spawn(fun () -> ok end), - % Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, settled}, Cid}, - {enqueue, self(), 3, three}, - {enqueue, self(), 4, four}, - {checkout, {dequeue, settled}, Cid}, - {enqueue, self(), 5, five}, - {checkout, {dequeue, settled}, Cid} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, settled}, #{}), + make_enqueue(self(), 3, three), + make_enqueue(self(), 4, four), + make_checkout(Cid, {dequeue, settled}, #{}), + make_enqueue(self(), 5, five), + make_checkout(Cid, {dequeue, settled}, #{}) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1473,10 +1559,10 @@ enq_deq_settle_snapshot_recover_test() -> % OthPid = spawn(fun () -> ok end), % Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Cid} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Cid, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1486,13 +1572,13 @@ enq_deq_settle_snapshot_recover_2_test() -> OthPid = spawn(fun () -> ok end), Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, two}, - {checkout, {dequeue, unsettled}, Oth}, - {settle, [0], Oth} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Cid, [0]), + make_enqueue(self(), 3, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Oth, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1500,11 +1586,11 @@ snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {enqueue, self(), 3, three}, - purge + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_enqueue(self(), 3, three), + make_purge() ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1512,12 +1598,12 @@ enq_deq_return_settle_snapshot_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, %% to Cid - {checkout, {auto, 1, simple_prefetch}, Cid}, - {return, [0], Cid}, %% should be re-delivered to Cid - {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 - {settle, [1], Cid}, - {settle, [2], Cid} + make_enqueue(self(), 1, one), %% to Cid + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_return(Cid, [0]), %% should be re-delivered to Cid + make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 + make_settle(Cid, [1]), + make_settle(Cid, [2]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1525,10 +1611,10 @@ return_prefix_msg_count_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, - {checkout, {auto, 1, simple_prefetch}, Cid}, - {checkout, cancel, Cid}, - {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2 + make_enqueue(self(), 1, one), + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_checkout(Cid, cancel, #{}), + make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2 ], Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), @@ -1541,16 +1627,16 @@ return_settle_snapshot_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, %% to Cid - {checkout, {auto, 1, simple_prefetch}, Cid}, - {return, [0], Cid}, %% should be re-delivered to Oth - {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 - {settle, [1], Cid}, - {return, [2], Cid}, - {settle, [3], Cid}, - {enqueue, self(), 3, three}, - purge, - {enqueue, self(), 4, four} + make_enqueue(self(), 1, one), %% to Cid + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_return(Cid, [0]), %% should be re-delivered to Oth + make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 + make_settle(Cid, [1]), + make_return(Cid, [2]), + make_settle(Cid, [3]), + make_enqueue(self(), 3, three), + make_purge(), + make_enqueue(self(), 4, four) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1558,13 +1644,13 @@ enq_check_settle_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {settle, [1], Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, three}, - {settle, [2], Cid} + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_settle(Cid, [1]), + make_settle(Cid, [0]), + make_enqueue(self(), 3, three), + make_settle(Cid, [2]) ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1573,13 +1659,13 @@ enq_check_settle_snapshot_purge_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {settle, [1], Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, three}, - purge + make_checkout(Cid, {auto, 2, simple_prefetch},#{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_settle(Cid, [1]), + make_settle(Cid, [0]), + make_enqueue(self(), 3, three), + make_purge() ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1589,38 +1675,18 @@ enq_check_settle_duplicate_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, %% 0 - {enqueue, self(), 2, two}, %% 0 - {settle, [0], Cid}, - {settle, [1], Cid}, - {settle, [1], Cid}, - {enqueue, self(), 3, three}, - {settle, [2], Cid} + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), %% 0 + make_enqueue(self(), 2, two), %% 0 + make_settle(Cid, [0]), + make_settle(Cid, [1]), + make_settle(Cid, [1]), + make_enqueue(self(), 3, three), + make_settle(Cid, [2]) ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). - -multi_return_snapshot_test() -> - %% this was discovered using property testing - C1 = {<<>>, c:pid(0,6723,1)}, - C2 = {<<0>>,c:pid(0,6723,1)}, - E = c:pid(0,6720,1), - Commands = [ - {checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg}, - {enqueue,E,2,msg}, - {checkout,cancel,C1}, %% both on returns queue - {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 - {return,[0],C2}, %% E1 in returns, E2 with C2 - {return,[1],C2}, %% E2 in returns E1 with C2 - {settle,[2],C2} %% E2 with C2 - ], - run_snapshot_test(?FUNCTION_NAME, Commands), - ok. - - run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that @@ -1657,11 +1723,11 @@ delivery_query_returns_deliveries_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 5, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {enqueue, self(), 3, tre}, - {enqueue, self(), 4, for} + make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_enqueue(self(), 3, tre), + make_enqueue(self(), 4, for) ], Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), @@ -1677,7 +1743,8 @@ pending_enqueue_is_enqueued_on_down_test() -> {State0, _} = enq(1, 2, first, test_init(test)), {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0), {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. duplicate_delivery_test() -> @@ -1690,6 +1757,7 @@ duplicate_delivery_test() -> state_enter_test() -> S0 = init(#{name => the_name, + queue_resource => rabbit_misc:r(<<"/">>, queue, <<"test">>), become_leader_handler => {m, f, [a]}}), [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. @@ -1721,11 +1789,11 @@ state_enter_montors_and_notifications_test() -> purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _, {purge, 1}} = apply(meta(2), purge, [], State1), + {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} = - apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3), ok. purge_with_checkout_test() -> @@ -1733,7 +1801,7 @@ purge_with_checkout_test() -> {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), {State1, _} = enq(2, 1, first, State0), {State2, _} = enq(3, 2, second, State1), - {State3, _, {purge, 2}} = apply(meta(2), purge, [], State2), + {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), ?assertEqual(0, maps:size(Checked)), ok. @@ -1743,36 +1811,47 @@ meta(Idx) -> enq(Idx, MsgSeq, Msg, State) -> strip_reply( - apply(meta(Idx), {enqueue, self(), MsgSeq, Msg}, [], State)). + apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)). deq(Idx, Cid, Settlement, State0) -> {State, _, {dequeue, Msg}} = - apply(meta(Idx), {checkout, {dequeue, Settlement}, Cid}, [], State0), + apply(meta(Idx), + make_checkout(Cid, {dequeue, Settlement}, #{}), + [], State0), {State, Msg}. check_n(Cid, Idx, N, State) -> - strip_reply(apply(meta(Idx), - {checkout, {auto, N, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + [], State)). check(Cid, Idx, State) -> - strip_reply(apply(meta(Idx), - {checkout, {once, 1, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + [], State)). check_auto(Cid, Idx, State) -> - strip_reply(apply(meta(Idx), - {checkout, {auto, 1, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + [], State)). check(Cid, Idx, Num, State) -> - strip_reply(apply(meta(Idx), - {checkout, {once, Num, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {once, Num, simple_prefetch}, #{}), + [], State)). settle(Cid, Idx, MsgId, State) -> - strip_reply(apply(meta(Idx), {settle, [MsgId], Cid}, [], State)). + strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)). credit(Cid, Idx, Credit, DelCnt, Drain, State) -> - strip_reply(apply(meta(Idx), {credit, Credit, DelCnt, Drain, Cid}, [], State)). + strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain), + [], State)). -strip_reply({State, Effects, _Replu}) -> +strip_reply({State, Effects, _Reply}) -> {State, Effects}. run_log(InitState, Entries) -> @@ -1789,7 +1868,9 @@ run_log(InitState, Entries) -> aux_test() -> _ = ra_machine_ets:start_link(), Aux0 = init_aux(aux_test), - MacState = init(#{name => aux_test}), + MacState = init(#{name => aux_test, + queue_resource => + rabbit_misc:r(<<"/">>, queue, <<"test">>)}), Log = undefined, {no_reply, Aux, undefined} = handle_aux(leader, cast, active, Aux0, Log, MacState), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 635d85be4a..9cdb1dfbe7 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -24,8 +24,8 @@ init/2, init/3, init/5, - checkout/3, checkout/4, + checkout/5, cancel_checkout/2, enqueue/2, enqueue/3, @@ -42,6 +42,7 @@ ]). -include_lib("ra/include/ra.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). @@ -51,7 +52,7 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. --record(consumer, {last_msg_id :: seq(), +-record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). -record(state, {cluster_name :: ra_cluster_name(), @@ -98,7 +99,7 @@ init(ClusterName, Servers) -> %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. -spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). -init(ClusterName, Servers, SoftLimit) -> +init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, servers = Servers, @@ -107,7 +108,7 @@ init(ClusterName, Servers, SoftLimit) -> -spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). -init(ClusterName, Servers, SoftLimit, BlockFun, UnblockFun) -> +init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, servers = Servers, @@ -135,7 +136,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, Node = pick_node(State0), {Next, State1} = next_enqueue_seq(State0), % by default there is no correlation id - Cmd = {enqueue, self(), Next, Msg}, + Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of {slow, _} = Ret when not Slow -> BlockFun(), @@ -177,8 +178,11 @@ enqueue(Msg, State) -> dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> Node = pick_node(State0), ConsumerId = consumer_id(ConsumerTag), - case ra:process_command(Node, {checkout, {dequeue, Settlement}, - ConsumerId}, Timeout) of + case ra:process_command(Node, + rabbit_fifo:make_checkout(ConsumerId, + {dequeue, Settlement}, + #{}), + Timeout) of {ok, {dequeue, Reply}, Leader} -> {ok, Reply, State0#state{leader = Leader}}; Err -> @@ -198,7 +202,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {ok, state()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), - Cmd = {settle, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -232,7 +236,7 @@ settle(ConsumerTag, [_|_] = MsgIds, return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), % TODO: make rabbit_fifo return support lists of message ids - Cmd = {return, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -265,7 +269,7 @@ return(ConsumerTag, [_|_] = MsgIds, {ok | slow, state()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), - Cmd = {discard, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -299,9 +303,10 @@ discard(ConsumerTag, [_|_] = MsgIds, %% %% @returns `{ok, State}' or `{error | timeout, term()}' -spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), + rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, State0) -> - checkout(ConsumerTag, NumUnsettled, simple_prefetch, State0). +checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) -> + checkout(ConsumerTag, NumUnsettled, simple_prefetch, ConsumerInfo, State0). %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. @@ -319,13 +324,17 @@ checkout(ConsumerTag, NumUnsettled, State0) -> %% @param State The {@module} state. %% %% @returns `{ok, State}' or `{error | timeout, term()}' --spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), +-spec checkout(rabbit_fifo:consumer_tag(), + NumUnsettled :: non_neg_integer(), CreditMode :: rabbit_fifo:credit_mode(), + Meta :: rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, CreditMode, State0) -> +checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, - Cmd = {checkout, {auto, NumUnsettled, CreditMode}, ConsumerId}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, + {auto, NumUnsettled, CreditMode}, + Meta), try_process_command(Servers, Cmd, State0). %% @doc Provide credit to the queue @@ -348,8 +357,8 @@ credit(ConsumerTag, Credit, Drain, %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), Node = pick_node(State0), - Cmd = {credit, Credit, C#consumer.last_msg_id + 1, Drain, ConsumerId}, - ct:pal("sending credit ~w", [Cmd]), + Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, + C#consumer.last_msg_id + 1, Drain), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -372,7 +381,7 @@ credit(ConsumerTag, Credit, Drain, cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, - Cmd = {checkout, cancel, ConsumerId}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, try_process_command(Servers, Cmd, State). @@ -380,7 +389,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> %% of messages purged. -spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. purge(Node) -> - case ra:process_command(Node, purge) of + case ra:process_command(Node, rabbit_fifo:make_purge()) of {ok, {purge, Reply}, _} -> {ok, Reply}; Err -> @@ -393,7 +402,7 @@ cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. update_machine_state(Node, Conf) -> - case ra:process_command(Node, {update_state, Conf}) of + case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of {ok, ok, _} -> ok; Err -> @@ -520,7 +529,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) -> -spec untracked_enqueue([ra_server_id()], term()) -> ok. untracked_enqueue([Node | _], Msg) -> - Cmd = {enqueue, undefined, undefined, Msg}, + Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), ok = ra:pipeline_command(Node, Cmd), ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 5e854a4657..bb8af13b9d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -19,14 +19,14 @@ -export([init_state/2, handle_event/2]). -export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]). -export([info/1, info/2, stat/1, infos/1]). --export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]). +-export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]). -export([credit/4]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). -export([dead_letter_publish/4]). -export([queue_name/1]). -export([cluster_state/1, status/2]). --export([cancel_consumer_handler/3, cancel_consumer/3]). +-export([cancel_consumer_handler/2, cancel_consumer/3]). -export([become_leader/2, update_metrics/2]). -export([rpc_delete_metrics/1]). -export([format/1]). @@ -47,12 +47,9 @@ -spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. --spec declare(rabbit_types:amqqueue()) -> {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()}. -spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() | {'absent', rabbit_types:amqqueue(), atom()}]. -spec stop(rabbit_types:vhost()) -> 'ok'. --spec delete(rabbit_types:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> - {'ok', QLen :: non_neg_integer()}. -spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> @@ -61,15 +58,9 @@ rabbit_fifo_client:state()) -> {'ok', 'empty', rabbit_fifo_client:state()} | {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. --spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(), - ConsumerPrefetchCount :: non_neg_integer(), rabbit_types:ctag(), - ExclusiveConsume :: boolean(), Args :: rabbit_framing:amqp_table(), - any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. --spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) -> - rabbit_fifo_client:state(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). -spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). @@ -95,7 +86,7 @@ -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> rabbit_fifo_client:state(). -init_state({Name, _}, QName) -> +init_state({Name, _}, QName = #resource{}) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not %% know what to do if the queue has `disappeared`. Let it crash. @@ -104,13 +95,16 @@ init_state({Name, _}, QName) -> %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], - rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, + rabbit_fifo_client:init(QName, Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). +-spec declare(rabbit_types:amqqueue()) -> + {'new', rabbit_types:amqqueue()} | + {existing, rabbit_types:amqqueue()}. declare(#amqqueue{name = QName, durable = Durable, auto_delete = AutoDelete, @@ -150,20 +144,17 @@ declare(#amqqueue{name = QName, Ex end. - - ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. ra_machine_config(Q = #amqqueue{name = QName}) -> #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + queue_resource => QName, become_leader_handler => {?MODULE, become_leader, [QName]}, metrics_handler => {?MODULE, update_metrics, [QName]}}. -cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> +cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> Node = node(ChPid), - % QName = queue_name(Name), case Node == node() of true -> cancel_consumer(QName, ChPid, ConsumerTag); false -> @@ -247,9 +238,9 @@ recover(Queues) -> RaNodes = [{Name, Node} || Node <- Nodes], case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of ok -> ok; - Err -> + Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" - " be started ~w", [Name, Err]), + " be started ~w", [Name, Err2]), ok end; {error, {already_started, _}} -> @@ -275,7 +266,12 @@ stop(VHost) -> _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], ok. -delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, +-spec delete(rabbit_types:amqqueue(), + boolean(), boolean(), + rabbit_types:username()) -> + {ok, QLen :: non_neg_integer()}. +delete(#amqqueue{type = quorum, pid = {Name, _}, + name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -345,17 +341,35 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, {error, timeout} end. +-spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(), + ConsumerPrefetchCount :: non_neg_integer(), + rabbit_types:ctag(), ExclusiveConsume :: boolean(), + Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), + any(), rabbit_fifo_client:state()) -> + {'ok', rabbit_fifo_client:state()}. basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, - QState0) -> + ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, + ActingUser, OkMsg, QState0) -> + %% TODO: validate consumer arguments + %% currently quorum queues do not support any arguments maybe_send_reply(ChPid, OkMsg), - %% A prefetch count of 0 means no limitation, let's make it into something large for ra + ConsumerTag = quorum_ctag(ConsumerTag0), + %% A prefetch count of 0 means no limitation, + %% let's make it into something large for ra Prefetch = case ConsumerPrefetchCount of 0 -> 2000; Other -> Other end, - {ok, QState} = rabbit_fifo_client:checkout(quorum_ctag(ConsumerTag), - Prefetch, QState0), + %% consumer info is used to describe the consumer properties + ConsumerMeta = #{ack => not NoAck, + prefetch => ConsumerPrefetchCount, + args => Args, + username => ActingUser}, + {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag, + Prefetch, + ConsumerMeta, + QState0), + %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, ConsumerPrefetchCount, Args), @@ -369,6 +383,9 @@ stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], Delivery#delivery.message). +-spec deliver(Confirm :: boolean(), rabbit_types:delivery(), + rabbit_fifo_client:state()) -> + {ok | slow, rabbit_fifo_client:state()}. deliver(false, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); deliver(true, Delivery, QState0) -> @@ -404,8 +421,8 @@ cleanup_data_dir() -> <- rabbit_amqqueue:list_by_type(quorum), lists:member(node(), Nodes)], Registered = ra_directory:list_registered(), - [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, - not lists:member(Name, Names)], + _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], ok. maybe_delete_data_dir(UId) -> diff --git a/src/truncate.erl b/src/truncate.erl deleted file mode 100644 index 034bc62fa8..0000000000 --- a/src/truncate.erl +++ /dev/null @@ -1,127 +0,0 @@ -%% The contents of this file are subject to the Mozilla Public License -%% Version 1.1 (the "License"); you may not use this file except in -%% compliance with the License. You may obtain a copy of the License -%% at http://www.mozilla.org/MPL/ -%% -%% Software distributed under the License is distributed on an "AS IS" -%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See -%% the License for the specific language governing rights and -%% limitations under the License. -%% -%% The Original Code is RabbitMQ. -%% -%% The Initial Developer of the Original Code is GoPivotal, Inc. -%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. -%% - --module(truncate). - --define(ELLIPSIS_LENGTH, 3). - --record(params, {content, struct, content_dec, struct_dec}). - --export([log_event/2, term/2]). - --ifdef(TEST). --export([term_size/3]). --endif. - -log_event({Type, GL, {Pid, Format, Args}}, Params) - when Type =:= error orelse - Type =:= info_msg orelse - Type =:= warning_msg -> - {Type, GL, {Pid, Format, [term(T, Params) || T <- Args]}}; -log_event({Type, GL, {Pid, ReportType, Report}}, Params) - when Type =:= error_report orelse - Type =:= info_report orelse - Type =:= warning_report -> - {Type, GL, {Pid, ReportType, report(Report, Params)}}; -log_event(Event, _Params) -> - Event. - -report([[Thing]], Params) -> report([Thing], Params); -report(List, Params) when is_list(List) -> [case Item of - {K, V} -> {K, term(V, Params)}; - _ -> term(Item, Params) - end || Item <- List]; -report(Other, Params) -> term(Other, Params). - -term(Thing, {Max, {Content, Struct, ContentDec, StructDec}}) -> - case exceeds_size(Thing, Max) of - true -> term(Thing, true, #params{content = Content, - struct = Struct, - content_dec = ContentDec, - struct_dec = StructDec}); - false -> Thing - end. - -term(Bin, _AllowPrintable, #params{content = N}) - when (is_binary(Bin) orelse is_bitstring(Bin)) - andalso size(Bin) > N - ?ELLIPSIS_LENGTH -> - Suffix = without_ellipsis(N), - <<Head:Suffix/binary, _/bitstring>> = Bin, - <<Head/binary, <<"...">>/binary>>; -term(L, AllowPrintable, #params{struct = N} = Params) when is_list(L) -> - case AllowPrintable andalso io_lib:printable_list(L) of - true -> N2 = without_ellipsis(N), - case length(L) > N2 of - true -> string:left(L, N2) ++ "..."; - false -> L - end; - false -> shrink_list(L, Params) - end; -term(T, _AllowPrintable, Params) when is_tuple(T) -> - list_to_tuple(shrink_list(tuple_to_list(T), Params)); -term(T, _, _) -> - T. - -without_ellipsis(N) -> erlang:max(N - ?ELLIPSIS_LENGTH, 0). - -shrink_list(_, #params{struct = N}) when N =< 0 -> - ['...']; -shrink_list([], _) -> - []; -shrink_list([H|T], #params{content = Content, - struct = Struct, - content_dec = ContentDec, - struct_dec = StructDec} = Params) -> - [term(H, true, Params#params{content = Content - ContentDec, - struct = Struct - StructDec}) - | term(T, false, Params#params{struct = Struct - 1})]. - -%%---------------------------------------------------------------------------- - -%% We don't use erts_debug:flat_size/1 because that ignores binary -%% sizes. This is all going to be rather approximate though, these -%% sizes are probably not very "fair" but we are just trying to see if -%% we reach a fairly arbitrary limit anyway though. -exceeds_size(Thing, Max) -> - case term_size(Thing, Max, erlang:system_info(wordsize)) of - limit_exceeded -> true; - _ -> false - end. - -term_size(B, M, _W) when is_bitstring(B) -> lim(M, size(B)); -term_size(A, M, W) when is_atom(A) -> lim(M, 2 * W); -term_size(N, M, W) when is_number(N) -> lim(M, 2 * W); -term_size(T, M, W) when is_tuple(T) -> tuple_term_size( - T, M, 1, tuple_size(T), W); -term_size([], M, _W) -> - M; -term_size([H|T], M, W) -> - case term_size(H, M, W) of - limit_exceeded -> limit_exceeded; - M2 -> lim(term_size(T, M2, W), 2 * W) - end; -term_size(X, M, W) -> - lim(M, erts_debug:flat_size(X) * W). - -lim(S, T) when is_number(S) andalso S > T -> S - T; -lim(_, _) -> limit_exceeded. - -tuple_term_size(_T, limit_exceeded, _I, _S, _W) -> - limit_exceeded; -tuple_term_size(_T, M, I, S, _W) when I > S -> - M; -tuple_term_size(T, M, I, S, W) -> - tuple_term_size(T, lim(term_size(element(I, T), M, W), 2 * W), I + 1, S, W). diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 5b87c5be20..c94fb9ddab 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -709,6 +709,7 @@ subscribe(Config) -> ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + qos(Ch, 10, false), RaName = ra_name(QQ), publish(Ch, QQ), wait_for_messages_ready(Servers, RaName, 1), @@ -717,6 +718,14 @@ subscribe(Config) -> receive_basic_deliver(false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), + %% validate we can retrieve the consumers + [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ct:pal("Consumer ~p", [Consumer]), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), rabbit_ct_client_helpers:close_channel(Ch), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). @@ -1507,7 +1516,10 @@ basic_cancel(Config) -> wait_for_messages_pending_ack(Servers, RaName, 1), amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0) + wait_for_messages_pending_ack(Servers, RaName, 0), + [] = rpc:call(Server, ets, tab2list, [consumer_created]) + after 5000 -> + exit(basic_deliver_timeout) end. purge(Config) -> diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 56608e9af3..3263a733a9 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -4,6 +4,7 @@ -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). all() -> [ @@ -50,11 +51,16 @@ end_per_group(_, Config) -> Config. init_per_testcase(TestCase, Config) -> + meck:new(rabbit_quorum_queue, [passthrough]), + meck:expect(rabbit_quorum_queue, update_metrics, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, cancel_consumer_handler, + fun (_, _) -> ok end), ra_server_sup:remove_all(), ServerName2 = list_to_atom(atom_to_list(TestCase) ++ "2"), ServerName3 = list_to_atom(atom_to_list(TestCase) ++ "3"), + ClusterName = rabbit_misc:r("/", queue, atom_to_binary(TestCase, utf8)), [ - {cluster_name, TestCase}, + {cluster_name, ClusterName}, {uid, atom_to_binary(TestCase, utf8)}, {node_id, {TestCase, node()}}, {uid2, atom_to_binary(ServerName2, utf8)}, @@ -63,6 +69,10 @@ init_per_testcase(TestCase, Config) -> {node_id3, {ServerName3, node()}} | Config]. +end_per_testcase(_, Config) -> + meck:unload(), + Config. + basics(Config) -> ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), @@ -70,7 +80,7 @@ basics(Config) -> CustomerTag = UId, ok = start_cluster(ClusterName, [ServerId]), FState0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, FState0), + {ok, FState1} = rabbit_fifo_client:checkout(CustomerTag, 1, undefined, FState0), ra_log_wal:force_roll_over(ra_log_wal), % create segment the segment will trigger a snapshot @@ -167,7 +177,7 @@ duplicate_delivery(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), Fun = fun Loop(S0) -> receive @@ -201,7 +211,7 @@ usage(Config) -> ServerId = ?config(node_id, Config), ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(corr1, msg1, F1), {ok, F3} = rabbit_fifo_client:enqueue(corr2, msg2, F2), {_, _, _} = process_ra_events(F3, 50), @@ -256,7 +266,7 @@ detects_lost_delivery(Config) -> F000 = rabbit_fifo_client:init(ClusterName, [ServerId]), {ok, F00} = rabbit_fifo_client:enqueue(msg1, F000), {_, _, F0} = process_ra_events(F00, 100), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg2, F1), {ok, F3} = rabbit_fifo_client:enqueue(msg3, F2), % lose first delivery @@ -285,7 +295,8 @@ returns_after_down(Config) -> Self = self(), _Pid = spawn(fun () -> F = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, F), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 10, + undefined, F), Self ! checkout_done end), receive checkout_done -> ok after 1000 -> exit(checkout_done_timeout) end, @@ -327,8 +338,11 @@ handles_reject_notification(Config) -> CId = {UId1, self()}, ok = start_cluster(ClusterName, [ServerId1, ServerId2]), - _ = ra:process_command(ServerId1, {checkout, - {auto, 10, simple_prefetch}, CId}), + _ = ra:process_command(ServerId1, + rabbit_fifo:make_checkout( + CId, + {auto, 10, simple_prefetch}, + #{})), % reverse order - should try the first node in the list first F0 = rabbit_fifo_client:init(ClusterName, [ServerId2, ServerId1]), {ok, F1} = rabbit_fifo_client:enqueue(one, F0), @@ -346,20 +360,21 @@ discard(Config) -> ServerId = ?config(node_id, Config), UId = ?config(uid, Config), ClusterName = ?config(cluster_name, Config), - Conf = #{cluster_name => ClusterName, + Conf = #{cluster_name => ClusterName#resource.name, id => ServerId, uid => UId, log_init_args => #{data_dir => PrivDir, uid => UId}, initial_member => [], machine => {module, rabbit_fifo, - #{dead_letter_handler => + #{queue_resource => discard, + dead_letter_handler => {?MODULE, dead_letter_handler, [self()]}}}}, _ = ra:start_server(Conf), ok = ra:trigger_election(ServerId), _ = ra:members(ServerId), F0 = rabbit_fifo_client:init(ClusterName, [ServerId]), - {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, F0), + {ok, F1} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F0), {ok, F2} = rabbit_fifo_client:enqueue(msg1, F1), F3 = discard_next_delivery(F2, 500), {ok, empty, _F4} = rabbit_fifo_client:dequeue(<<"tag1">>, settled, F3), @@ -380,7 +395,7 @@ cancel_checkout(Config) -> ok = start_cluster(ClusterName, [ServerId]), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), {ok, F1} = rabbit_fifo_client:enqueue(m1, F0), - {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, F1), + {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), {ok, {_, {_, m1}}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), @@ -395,7 +410,7 @@ credit(Config) -> {ok, F2} = rabbit_fifo_client:enqueue(m2, F1), {_, _, F3} = process_ra_events(F2, [], 250), %% checkout with 0 prefetch - {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, F3), + {ok, F4} = rabbit_fifo_client:checkout(<<"tag">>, 0, credited, undefined, F3), %% assert no deliveries {_, _, F5} = process_ra_events0(F4, [], [], 250, fun @@ -464,7 +479,7 @@ test_queries(Config) -> receive stop -> ok end end), F0 = rabbit_fifo_client:init(ClusterName, [ServerId], 4), - {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, F0), + {ok, _} = rabbit_fifo_client:checkout(<<"tag">>, 1, undefined, F0), {ok, {_, Ready}, _} = ra:local_query(ServerId, fun rabbit_fifo:query_messages_ready/1), ?assertEqual(1, maps:size(Ready)), @@ -485,7 +500,7 @@ dead_letter_handler(Pid, Msgs) -> Pid ! {dead_letter, Msgs}. dequeue(Config) -> - ClusterName = ?config(priv_dir, Config), + ClusterName = ?config(cluster_name, Config), ServerId = ?config(node_id, Config), UId = ?config(uid, Config), Tag = UId, @@ -611,14 +626,15 @@ validate_process_down(Name, Num) -> end. start_cluster(ClusterName, ServerIds, RaFifoConfig) -> - {ok, Started, _} = ra:start_cluster(ClusterName, + {ok, Started, _} = ra:start_cluster(ClusterName#resource.name, {module, rabbit_fifo, RaFifoConfig}, ServerIds), ?assertEqual(length(Started), length(ServerIds)), ok. start_cluster(ClusterName, ServerIds) -> - start_cluster(ClusterName, ServerIds, #{}). + start_cluster(ClusterName, ServerIds, #{name => some_name, + queue_resource => ClusterName}). flush() -> receive diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 48e7b9aa7f..1c1ab42a9e 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -64,14 +64,14 @@ scenario1(_Config) -> E = c:pid(0,6720,1), Commands = [ - {checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {enqueue,E,2,msg2}, - {checkout,cancel,C1}, %% both on returns queue - {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 - {return,[0],C2}, %% E1 in returns, E2 with C2 - {return,[1],C2}, %% E2 in returns E1 with C2 - {settle,[2],C2} %% E2 with C2 + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,1,msg1), + make_enqueue(E,2,msg2), + make_checkout(C1, cancel), %% both on returns queue + make_checkout(C2, {auto,1,simple_prefetch}), + make_return(C2, [0]), %% E1 in returns, E2 with C2 + make_return(C2, [1]), %% E2 in returns E1 with C2 + make_settle(C2, [2]) %% E2 with C2 ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -80,13 +80,13 @@ scenario2(_Config) -> C1 = {<<>>, c:pid(0,346,1)}, C2 = {<<>>,c:pid(0,379,1)}, E = c:pid(0,327,1), - Commands = [{checkout,{auto,1,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {checkout,cancel,C1}, - {enqueue,E,2,msg2}, - {checkout,{auto,1,simple_prefetch},C2}, - {settle,[0],C1}, - {settle,[0],C2} + Commands = [make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,msg1), + make_checkout(C1, cancel), + make_enqueue(E,2,msg2), + make_checkout(C2, {auto,1,simple_prefetch}), + make_settle(C1, [0]), + make_settle(C2, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -94,22 +94,24 @@ scenario2(_Config) -> scenario3(_Config) -> C1 = {<<>>, c:pid(0,179,1)}, E = c:pid(0,176,1), - Commands = [{checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg1}, - {return,[0],C1}, - {enqueue,E,2,msg2}, - {enqueue,E,3,msg3}, - {settle,[1],C1}, - {settle,[2],C1}], + Commands = [make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E,1,msg1), + make_return(C1, [0]), + make_enqueue(E,2,msg2), + make_enqueue(E,3,msg3), + make_settle(C1, [1]), + make_settle(C1, [2]) + ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. scenario4(_Config) -> C1 = {<<>>, c:pid(0,179,1)}, E = c:pid(0,176,1), -Commands = [{checkout,{auto,1,simple_prefetch},C1}, - {enqueue,E,1,msg}, - {settle,[0],C1}], + Commands = [make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,msg), + make_settle(C1, [0]) + ], run_snapshot_test(?FUNCTION_NAME, Commands), ok. @@ -167,6 +169,7 @@ checkout_gen(Pid) -> -record(t, {state = rabbit_fifo:init(#{name => proper, + queue_resource => blah, shadow_copy_interval => 1}) :: rabbit_fifo:state(), index = 1 :: non_neg_integer(), %% raft index @@ -201,7 +204,7 @@ handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = {enqueue, Pid, MsgSeq, msg}, + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, msg), case When of enqueue -> do_apply(Cmd, T#t{enqueuers = Enqs}); @@ -218,7 +221,7 @@ handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> end, Cons0)) of [CId | _] -> Cons = maps:remove(CId, Cons0), - Cmd = {checkout, cancel, CId}, + Cmd = rabbit_fifo:make_checkout(CId, cancel, #{}), do_apply(Cmd, T#t{consumers = Cons}); _ -> T @@ -230,7 +233,13 @@ handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) -> T; _ -> Cons = maps:put(CId, ok, Cons0), - Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId}, + Cmd = rabbit_fifo:make_checkout(CId, + {auto, Prefetch, simple_prefetch}, + #{ack => true, + prefetch => Prefetch, + username => <<"user">>, + args => []}), + do_apply(Cmd, T#t{consumers = Cons}) end; handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) -> @@ -253,14 +262,19 @@ handle_op({input_event, requeue}, #t{effects = Effs} = T) -> handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> case queue:out(Effs) of {{value, {settle, MsgIds, CId}}, Q} -> - do_apply({Settlement, MsgIds, CId}, T#t{effects = Q}); - {{value, {enqueue, _, _, _} = Cmd}, Q} -> + Cmd = case Settlement of + settle -> rabbit_fifo:make_settle(CId, MsgIds); + return -> rabbit_fifo:make_return(CId, MsgIds); + discard -> rabbit_fifo:make_discard(CId, MsgIds) + end, + do_apply(Cmd, T#t{effects = Q}); + {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> do_apply(Cmd, T#t{effects = Q}); _ -> T end; handle_op(purge, T) -> - do_apply(purge, T). + do_apply(rabbit_fifo:make_purge(), T). do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, log = Log} = T) -> @@ -275,7 +289,7 @@ enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> MsgIds = [I || {I, _} <- Msgs], %% always make settle commands by default %% they can be changed depending on the input event later - Cmd = {settle, MsgIds, {CTag, P}}, + Cmd = rabbit_fifo:make_settle({CTag, P}, MsgIds), enq_effs(Rem, queue:in(Cmd, Q)); enq_effs([_ | Rem], Q) -> % ct:pal("enq_effs dropping ~w~n", [E]), @@ -310,18 +324,8 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), - % L = case Filtered of - % [] -> undefined; - % _ ->lists:last(Filtered) - % end, - - % ct:pal("running from snapshot: ~b to ~w" - % "~n~p~n", - % [SnapIdx, L, SnapState]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index - % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", - % [Name, SnapIdx, S, State, SnapState, Filtered]), ?assertEqual(State, S) end || {release_cursor, SnapIdx, SnapState} <- Effects], ok. @@ -342,7 +346,20 @@ run_log(InitState, Entries) -> test_init(Name) -> rabbit_fifo:init(#{name => Name, + queue_resource => blah, shadow_copy_interval => 0, metrics_handler => {?MODULE, metrics_handler, []}}). meta(Idx) -> #{index => Idx, term => 1}. + +make_checkout(Cid, Spec) -> + rabbit_fifo:make_checkout(Cid, Spec, #{}). + +make_enqueue(Pid, Seq, Msg) -> + rabbit_fifo:make_enqueue(Pid, Seq, Msg). + +make_settle(Cid, MsgIds) -> + rabbit_fifo:make_settle(Cid, MsgIds). + +make_return(Cid, MsgIds) -> + rabbit_fifo:make_return(Cid, MsgIds). diff --git a/test/unit_SUITE.erl b/test/unit_SUITE.erl index be9c8d8698..f74e18afb9 100644 --- a/test/unit_SUITE.erl +++ b/test/unit_SUITE.erl @@ -57,11 +57,6 @@ groups() -> check_shutdown_ignored ]}, table_codec, - {truncate, [parallel], [ - short_examples_exactly, - term_limit, - large_examples_for_size - ]}, unfold, {vm_memory_monitor, [parallel], [ parse_line_linux @@ -698,71 +693,6 @@ check_shutdown(SigStop, Iterations, ChildCount, SupTimeout) -> Res. %% --------------------------------------------------------------------------- -%% truncate. -%% --------------------------------------------------------------------------- - -short_examples_exactly(_Config) -> - F = fun (Term, Exp) -> - Exp = truncate:term(Term, {1, {10, 10, 5, 5}}), - Term = truncate:term(Term, {100000, {10, 10, 5, 5}}) - end, - FSmall = fun (Term, Exp) -> - Exp = truncate:term(Term, {1, {2, 2, 2, 2}}), - Term = truncate:term(Term, {100000, {2, 2, 2, 2}}) - end, - F([], []), - F("h", "h"), - F("hello world", "hello w..."), - F([[h,e,l,l,o,' ',w,o,r,l,d]], [[h,e,l,l,o,'...']]), - F([a|b], [a|b]), - F(<<"hello">>, <<"hello">>), - F([<<"hello world">>], [<<"he...">>]), - F(<<1:1>>, <<1:1>>), - F(<<1:81>>, <<0:56, "...">>), - F({{{{a}}},{b},c,d,e,f,g,h,i,j,k}, {{{'...'}},{b},c,d,e,f,g,h,i,j,'...'}), - FSmall({a,30,40,40,40,40}, {a,30,'...'}), - FSmall([a,30,40,40,40,40], [a,30,'...']), - P = spawn(fun() -> receive die -> ok end end), - F([0, 0.0, <<1:1>>, F, P], [0, 0.0, <<1:1>>, F, P]), - P ! die, - R = make_ref(), - F([R], [R]), - ok. - -term_limit(_Config) -> - W = erlang:system_info(wordsize), - S = <<"abc">>, - 1 = truncate:term_size(S, 4, W), - limit_exceeded = truncate:term_size(S, 3, W), - case 100 - truncate:term_size([S, S], 100, W) of - 22 -> ok; %% 32 bit - 38 -> ok %% 64 bit - end, - case 100 - truncate:term_size([S, [S]], 100, W) of - 30 -> ok; %% ditto - 54 -> ok - end, - limit_exceeded = truncate:term_size([S, S], 6, W), - ok. - -large_examples_for_size(_Config) -> - %% Real world values - Shrink = fun(Term) -> truncate:term(Term, {1, {1000, 100, 50, 5}}) end, - TestSize = fun(Term) -> - true = 5000000 < size(term_to_binary(Term)), - true = 500000 > size(term_to_binary(Shrink(Term))) - end, - TestSize(lists:seq(1, 5000000)), - TestSize(recursive_list(1000, 10)), - TestSize(recursive_list(5000, 20)), - TestSize(gb_sets:from_list([I || I <- lists:seq(1, 1000000)])), - TestSize(gb_trees:from_orddict([{I, I} || I <- lists:seq(1, 1000000)])), - ok. - -recursive_list(S, 0) -> lists:seq(1, S); -recursive_list(S, N) -> [recursive_list(S div N, N-1) || _ <- lists:seq(1, S)]. - -%% --------------------------------------------------------------------------- %% vm_memory_monitor. %% --------------------------------------------------------------------------- |
