diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-14 10:10:29 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-14 10:10:29 +0000 |
| commit | 46b6c2f303846e218964857e033ee730cf87bd66 (patch) | |
| tree | cd819a9f0b736b2f819eae113e45f3d1c9a4b72a /src | |
| parent | 0a21df17d97ead045bedd42bf2fd867687eeec17 (diff) | |
| download | rabbitmq-server-git-46b6c2f303846e218964857e033ee730cf87bd66.tar.gz | |
Implement consumer listing for quorum queues
Refactor rabbit_fifo internal commands to use records instead of plain
tuples to provide a little bit more compile time safety and make it
easier to extend in the future.
[#162584074]
Diffstat (limited to 'src')
| -rw-r--r-- | src/lqueue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_amqqueue.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 433 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 51 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 64 |
5 files changed, 379 insertions, 237 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl index 1abe4e0b82..2b75ef4856 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -25,27 +25,32 @@ -define(QUEUE, queue). --export_type([?MODULE/0]). +-export_type([ + ?MODULE/0, + ?MODULE/1 + ]). --opaque ?MODULE() :: {non_neg_integer(), queue:queue()}. +-opaque ?MODULE() :: {non_neg_integer(), queue:queue(term())}. +-opaque ?MODULE(T) :: {non_neg_integer(), queue:queue(T)}. -type value() :: any(). --type result() :: 'empty' | {'value', value()}. +-type result(T) :: 'empty' | {'value', T}. --spec new() -> ?MODULE(). --spec drop(?MODULE()) -> ?MODULE(). --spec is_empty(?MODULE()) -> boolean(). --spec len(?MODULE()) -> non_neg_integer(). --spec in(value(), ?MODULE()) -> ?MODULE(). +-spec new() -> ?MODULE(_). +-spec drop(?MODULE(T)) -> ?MODULE(T). +-spec is_empty(?MODULE(_)) -> boolean(). +-spec len(?MODULE(_)) -> non_neg_integer(). +-spec in(T, ?MODULE(T)) -> ?MODULE(T). -spec in_r(value(), ?MODULE()) -> ?MODULE(). --spec out(?MODULE()) -> {result(), ?MODULE()}. --spec out_r(?MODULE()) -> {result(), ?MODULE()}. --spec join(?MODULE(), ?MODULE()) -> ?MODULE(). --spec foldl(fun ((value(), B) -> B), B, ?MODULE()) -> B. --spec foldr(fun ((value(), B) -> B), B, ?MODULE()) -> B. --spec from_list([value()]) -> ?MODULE(). --spec to_list(?MODULE()) -> [value()]. --spec peek(?MODULE()) -> result(). --spec peek_r(?MODULE()) -> result(). +-spec out(?MODULE(T)) -> {result(T), ?MODULE()}. +-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}. +-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B). +-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B. +-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B. +-spec from_list([T]) -> ?MODULE(T). +-spec to_list(?MODULE(T)) -> [T]. +% -spec peek(?MODULE()) -> result(). +-spec peek(?MODULE(T)) -> result(T). +-spec peek_r(?MODULE(T)) -> result(T). new() -> {0, ?QUEUE:new()}. diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 093d118a6b..016443bb73 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -919,8 +919,12 @@ notify_policy_changed(#amqqueue{pid = QPid, name = QName}) when ?IS_QUORUM(QPid) -> rabbit_quorum_queue:policy_changed(QName, QPid). -consumers(#amqqueue{ pid = QPid }) -> - delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). +consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}); +consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) -> + {ok, {_, Result}, _} = ra:local_query(QPid, + fun rabbit_fifo:query_consumers/1), + maps:values(Result). consumer_info_keys() -> ?CONSUMER_INFO_KEYS. @@ -1147,14 +1151,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP [rabbit_misc:rs(QName), Reason]) end. -basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, LimiterPid, - LimiterActive, ConsumerPrefetchCount, ConsumerTag, +basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, + LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, ActingUser, QState) -> ok = check_consume_arguments(QName, Args), - case delegate:invoke(QPid, {gen_server2, call, - [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, - Args, OkMsg, ActingUser}, infinity]}) of + case delegate:invoke(QPid, + {gen_server2, call, + [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive, + ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, + Args, OkMsg, ActingUser}, infinity]}) of ok -> {ok, QState}; Err -> @@ -1164,15 +1169,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid, _LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag, _ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) -> {error, global_qos_not_supported_for_queue_type}; -basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, NoAck, ChPid, - _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag, - ExclusiveConsume, Args, OkMsg, _ActingUser, QStates) -> +basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, + NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount, + ConsumerTag, ExclusiveConsume, Args, OkMsg, + ActingUser, QStates) -> ok = check_consume_arguments(QName, Args), QState0 = get_quorum_state(Id, QName, QStates), {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid, ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, + ActingUser, OkMsg, QState0), {ok, maps:put(Name, QState, QStates)}. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1f047027b9..2d5c267227 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -39,10 +39,21 @@ query_processes/1, query_ra_indexes/1, query_consumer_count/1, + query_consumers/1, usage/1, %% misc - dehydrate_state/1 + dehydrate_state/1, + + %% protocol helpers + make_enqueue/3, + make_checkout/3, + make_settle/2, + make_return/2, + make_discard/2, + make_credit/4, + make_purge/0, + make_update_state/1 ]). -type raw_msg() :: term(). @@ -93,19 +104,43 @@ {dequeue, settled | unsettled} | cancel. --type protocol() :: - {enqueue, Sender :: maybe(pid()), MsgSeq :: maybe(msg_seqno()), - Msg :: raw_msg()} | - {checkout, Spec :: checkout_spec(), Consumer :: consumer_id()} | - {settle, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {return, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {discard, MsgIds :: [msg_id()], Consumer :: consumer_id()} | - {credit, - Credit :: non_neg_integer(), - DeliveryCount :: non_neg_integer(), - Drain :: boolean(), - Consumer :: consumer_id()} | - purge. +-type consumer_meta() :: #{ack => boolean(), + username => binary(), + prefetch => non_neg_integer(), + args => list()}. +%% static meta data associated with a consumer + +%% command records representing all the protocol actions that are supported +-record(enqueue, {pid :: maybe(pid()), + seq :: maybe(msg_seqno()), + msg :: raw_msg()}). +-record(checkout, {consumer_id :: consumer_id(), + spec :: checkout_spec(), + meta :: consumer_meta()}). +-record(settle, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(return, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(discard, {consumer_id :: consumer_id(), + msg_ids :: [msg_id()]}). +-record(credit, {consumer_id :: consumer_id(), + credit :: non_neg_integer(), + delivery_count :: non_neg_integer(), + drain :: boolean()}). +-record(purge, {}). +-record(update_state, {config :: config()}). + + + +-opaque protocol() :: + #enqueue{} | + #checkout{} | + #settle{} | + #return{} | + #discard{} | + #credit{} | + #purge{} | + #update_state{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -120,7 +155,8 @@ -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, - {checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, + {meta = #{} :: consumer_meta(), + checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}}, next_msg_id = 0 :: msg_id(), % part of snapshot data %% max number of messages that can be sent %% decremented for each delivery @@ -158,7 +194,7 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'), + returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -196,6 +232,7 @@ -opaque state() :: #state{}. -type config() :: #{name := atom(), + queue_resource := rabbit_types:r('queue'), dead_letter_handler => applied_mfa(), become_leader_handler => applied_mfa(), shadow_copy_interval => non_neg_integer()}. @@ -203,7 +240,9 @@ -export_type([protocol/0, delivery/0, command/0, + credit_mode/0, consumer_tag/0, + consumer_meta/0, consumer_id/0, client_msg/0, msg/0, @@ -213,7 +252,7 @@ state/0, config/0]). --spec init(config()) -> {state(), ra_machine:effects()}. +-spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> update_state(Conf, #state{name = Name, @@ -232,7 +271,8 @@ update_state(Conf, State) -> -spec apply(ra_machine:command_meta_data(), command(), ra_machine:effects(), state()) -> {state(), ra_machine:effects(), Reply :: term()}. -apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> +apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq, + msg = RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of {ok, State0, Effects1} -> {State, Effects, ok} = checkout(State0, Effects1), @@ -240,7 +280,8 @@ apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> {duplicate, State, Effects} -> {State, Effects, ok} end; -apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0, +apply(#{index := RaftIdx}, + #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0} -> @@ -251,8 +292,8 @@ apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0, _ -> {State, Effects0, ok} end; -apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0, - #state{consumers = Cons0} = State0) -> +apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, + Effects0, #state{consumers = Cons0} = State0) -> case Cons0 of #{ConsumerId := Con0} -> {State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds, @@ -263,7 +304,7 @@ apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0, _ -> {State0, Effects0, ok} end; -apply(_, {return, MsgIds, ConsumerId}, Effects0, +apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0} = State) -> case Cons0 of #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> @@ -274,7 +315,8 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0, _ -> {State, Effects0, ok} end; -apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, +apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt, + drain = Drain, consumer_id = ConsumerId}, Effects0, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> case Cons0 of @@ -317,25 +359,29 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, %% credit for unknown consumer - just ignore {State0, Effects0, ok} end; -apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, +apply(_, #checkout{spec = {dequeue, _}}, Effects0, #state{messages = M, prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; -apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, +apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta, + consumer_id = ConsumerId}, Effects0, State0) -> % TODO: this clause could probably be optimised - State1 = update_consumer(ConsumerId, {once, 1, simple_prefetch}, State0), + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), % turn send msg effect into reply {success, _, MsgId, Msg, State2} = checkout_one(State1), % immediately settle - {State, Effects, _} = apply(Meta, {settle, [MsgId], ConsumerId}, + {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]), Effects0, State2), {State, Effects, {dequeue, {MsgId, Msg}}}; -apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer}, +apply(_, #checkout{spec = {dequeue, unsettled}, + meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, Effects0, State0) -> - State1 = update_consumer(Consumer, {once, 1, simple_prefetch}, State0), + State1 = update_consumer(ConsumerId, ConsumerMeta, + {once, 1, simple_prefetch}, State0), Effects1 = [{monitor, process, Pid} | Effects0], {State, Reply, Effects} = case checkout_one(State1) of {success, _, MsgId, Msg, S} -> @@ -346,16 +392,17 @@ apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer}, {S, empty, Effects1} end, {State, Effects, {dequeue, Reply}}; -apply(_, {checkout, cancel, ConsumerId}, Effects0, State0) -> +apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) -> {CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}), % TODO: here we should really demonitor the pid but _only_ if it has no % other consumers or enqueuers. checkout(State1, CancelEffects); -apply(_, {checkout, Spec, {_Tag, Pid} = ConsumerId}, Effects0, State0) -> - State1 = update_consumer(ConsumerId, Spec, State0), +apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId}, + Effects0, State0) -> + State1 = update_consumer(ConsumerId, Meta, Spec, State0), {State, Effects, Res} = checkout(State1, Effects0), {State, [{monitor, process, Pid} | Effects], Res}; -apply(#{index := RaftIdx}, purge, Effects0, +apply(#{index := RaftIdx}, #purge{}, Effects0, #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> Total = rabbit_fifo_index:size(Indexes), {State1, Effects1, _} = @@ -459,7 +506,7 @@ apply(_, {nodeup, Node}, Effects0, service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}; -apply(_, {update_state, Conf}, Effects, State) -> +apply(_, #update_state{config = Conf}, Effects, State) -> {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). @@ -567,6 +614,14 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) -> query_consumer_count(#state{consumers = Consumers}) -> maps:size(Consumers). +query_consumers(#state{consumers = Consumers}) -> + maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) -> + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)} + end, Consumers). %% other -spec usage(atom()) -> float(). @@ -998,11 +1053,12 @@ uniq_queue_in(Key, Queue) -> end. -update_consumer(ConsumerId, {Life, Credit, Mode}, +update_consumer(ConsumerId, Meta, {Life, Credit, Mode}, #state{consumers = Cons0, service_queue = ServiceQueue0} = State0) -> %% TODO: this logic may not be correct for updating a pre-existing consumer - Init = #consumer{lifetime = Life, credit = Credit, credit_mode = Mode}, + Init = #consumer{lifetime = Life, meta = Meta, + credit = Credit, credit_mode = Mode}, Cons = maps:update_with(ConsumerId, fun(S) -> %% remove any in-flight messages from @@ -1055,6 +1111,41 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), Con#consumer{checked_out = Checked}. +-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol(). +make_enqueue(Pid, Seq, Msg) -> + #enqueue{pid = Pid, seq = Seq, msg = Msg}. +-spec make_checkout(consumer_id(), + checkout_spec(), consumer_meta()) -> protocol(). +make_checkout(ConsumerId, Spec, Meta) -> + #checkout{consumer_id = ConsumerId, + spec = Spec, meta = Meta}. + +-spec make_settle(consumer_id(), [msg_id()]) -> protocol(). +make_settle(ConsumerId, MsgIds) -> + #settle{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_return(consumer_id(), [msg_id()]) -> protocol(). +make_return(ConsumerId, MsgIds) -> + #return{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_discard(consumer_id(), [msg_id()]) -> protocol(). +make_discard(ConsumerId, MsgIds) -> + #discard{consumer_id = ConsumerId, msg_ids = MsgIds}. + +-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(), + boolean()) -> protocol(). +make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> + #credit{consumer_id = ConsumerId, + credit = Credit, + delivery_count = DeliveryCount, + drain = Drain}. + +-spec make_purge() -> protocol(). +make_purge() -> #purge{}. + +-spec make_update_state(config()) -> protocol(). +make_update_state(Config) -> + #update_state{config = Config}. -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). @@ -1087,7 +1178,9 @@ enq_enq_checkout_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), {_State3, Effects, _} = - apply(meta(3), {checkout, {once, 2, simple_prefetch}, Cid}, [], State2), + apply(meta(3), + make_checkout(Cid, {once, 2, simple_prefetch}, #{}), + [], State2), ?ASSERT_EFF({monitor, _, _}, Effects), ?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects), ok. @@ -1097,7 +1190,8 @@ credit_enq_enq_checkout_settled_credit_test() -> {State1, _} = enq(1, 1, first, test_init(test)), {State2, _} = enq(2, 2, second, State1), {State3, Effects, _} = - apply(meta(3), {checkout, {auto, 1, credited}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}), + [], State2), ?ASSERT_EFF({monitor, _, _}, Effects), Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true; (_) -> false @@ -1127,12 +1221,13 @@ credit_with_drained_test() -> State0 = test_init(test), %% checkout with a single credit {State1, _, _} = - apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0), + apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}), + [], State0), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, delivery_count = 0}}}, State1), {State, _Effs, Result} = - apply(meta(3), {credit, 0, 5, true, Cid}, [], State1), + apply(meta(3), make_credit(Cid, 0, 5, true), [], State1), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), @@ -1147,12 +1242,13 @@ credit_and_drain_test() -> {State2, _} = enq(2, 2, second, State1), %% checkout without any initial credit (like AMQP 1.0 would) {State3, CheckEffs, _} = - apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}), + [], State2), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), {State4, Effects, {multi, [{send_credit_reply, 0}, {send_drained, [{?FUNCTION_NAME, 2}]}]}} = - apply(meta(4), {credit, 4, 0, true, Cid}, [], State3), + apply(meta(4), make_credit(Cid, 4, 0, true), [], State3), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), @@ -1171,7 +1267,8 @@ enq_enq_deq_test() -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value {_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State2), ok. enq_enq_deq_deq_settle_test() -> @@ -1180,9 +1277,11 @@ enq_enq_deq_deq_settle_test() -> {State2, _} = enq(2, 2, second, State1), % get returns a reply value {State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State2), {_State4, _Effects4, {dequeue, empty}} = - apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State3), ok. enq_enq_checkout_get_settled_test() -> @@ -1190,23 +1289,28 @@ enq_enq_checkout_get_settled_test() -> {State1, _} = enq(1, 1, first, test_init(test)), % get returns a reply value {_State2, _Effects, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. checkout_get_empty_test() -> Cid = {?FUNCTION_NAME, self()}, State = test_init(test), {_State2, [], {dequeue, empty}} = - apply(meta(1), {checkout, {dequeue, unsettled}, Cid}, [], State), + apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}), + [], State), ok. untracked_enq_deq_test() -> Cid = {?FUNCTION_NAME, self()}, State0 = test_init(test), - {State1, _, _} = apply(meta(1), {enqueue, undefined, undefined, first}, [], State0), + {State1, _, _} = apply(meta(1), + make_enqueue(undefined, undefined, first), [], State0), {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. + release_cursor_test() -> Cid = {?FUNCTION_NAME, self()}, {State1, _} = enq(1, 1, first, test_init(test)), @@ -1277,7 +1381,7 @@ return_non_existent_test() -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), % return non-existent - {_State2, [], _} = apply(meta(3), {return, [99], Cid}, [], State0), + {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0), ok. return_checked_out_test() -> @@ -1287,7 +1391,8 @@ return_checked_out_test() -> {send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} = check(Cid, 2, State0), % return - {_State2, [_, _], _} = apply(meta(3), {return, [MsgId], Cid}, [], State1), + {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]), + [], State1), ok. return_auto_checked_out_test() -> @@ -1300,7 +1405,7 @@ return_auto_checked_out_test() -> {send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} = check_auto(Cid, 2, State0), % return should include another delivery - {_State2, Effects, _} = apply(meta(3), {return, [MsgId], Cid}, [], State1), + {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _}, Effects), @@ -1313,12 +1418,15 @@ cancelled_checkout_out_test() -> {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), % cancelled checkout should return all pending messages to queue - {State2, _, _} = apply(meta(3), {checkout, cancel, Cid}, [], State1), + {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), + [], State1), + ?assertEqual(2, maps:size(State2#state.messages)), {State3, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State2), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2), + ?debugFmt("State3 ~p", [State3]), {_State, _, {dequeue, {_, {_, second}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3), ok. down_with_noproc_consumer_returns_unsettled_test() -> @@ -1394,7 +1502,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1), + {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -1411,7 +1519,8 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1), + {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), + [], State1), % assert mod call effect with appended reason and message ?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]}, Effects2), @@ -1424,7 +1533,7 @@ tick_test() -> {S1, _} = enq(2, 2, snd, S0), {S2, {MsgId, _}} = deq(3, Cid, unsettled, S1), {S3, {_, _}} = deq(4, Cid2, unsettled, S2), - {S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3), + {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3), [{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4), ok. @@ -1433,14 +1542,14 @@ enq_deq_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, settled}, Cid}, - {enqueue, self(), 3, three}, - {enqueue, self(), 4, four}, - {checkout, {dequeue, settled}, Cid}, - {enqueue, self(), 5, five}, - {checkout, {dequeue, settled}, Cid} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, settled}, #{}), + make_enqueue(self(), 3, three), + make_enqueue(self(), 4, four), + make_checkout(Cid, {dequeue, settled}, #{}), + make_enqueue(self(), 5, five), + make_checkout(Cid, {dequeue, settled}, #{}) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1450,10 +1559,10 @@ enq_deq_settle_snapshot_recover_test() -> % OthPid = spawn(fun () -> ok end), % Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Cid} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Cid, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1463,13 +1572,13 @@ enq_deq_settle_snapshot_recover_2_test() -> OthPid = spawn(fun () -> ok end), Oth = {<<"oth">>, OthPid}, Commands = [ - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, two}, - {checkout, {dequeue, unsettled}, Oth}, - {settle, [0], Oth} + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Cid, [0]), + make_enqueue(self(), 3, two), + make_checkout(Cid, {dequeue, unsettled}, #{}), + make_settle(Oth, [0]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1477,11 +1586,11 @@ snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {enqueue, self(), 3, three}, - purge + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_enqueue(self(), 3, three), + make_purge() ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1489,12 +1598,12 @@ enq_deq_return_settle_snapshot_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, %% to Cid - {checkout, {auto, 1, simple_prefetch}, Cid}, - {return, [0], Cid}, %% should be re-delivered to Cid - {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 - {settle, [1], Cid}, - {settle, [2], Cid} + make_enqueue(self(), 1, one), %% to Cid + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_return(Cid, [0]), %% should be re-delivered to Cid + make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 + make_settle(Cid, [1]), + make_settle(Cid, [2]) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1502,10 +1611,10 @@ return_prefix_msg_count_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, - {checkout, {auto, 1, simple_prefetch}, Cid}, - {checkout, cancel, Cid}, - {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2 + make_enqueue(self(), 1, one), + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_checkout(Cid, cancel, #{}), + make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2 ], Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), @@ -1518,16 +1627,16 @@ return_settle_snapshot_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {enqueue, self(), 1, one}, %% to Cid - {checkout, {auto, 1, simple_prefetch}, Cid}, - {return, [0], Cid}, %% should be re-delivered to Oth - {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 - {settle, [1], Cid}, - {return, [2], Cid}, - {settle, [3], Cid}, - {enqueue, self(), 3, three}, - purge, - {enqueue, self(), 4, four} + make_enqueue(self(), 1, one), %% to Cid + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + make_return(Cid, [0]), %% should be re-delivered to Oth + make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2 + make_settle(Cid, [1]), + make_return(Cid, [2]), + make_settle(Cid, [3]), + make_enqueue(self(), 3, three), + make_purge(), + make_enqueue(self(), 4, four) ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1535,13 +1644,13 @@ enq_check_settle_snapshot_recover_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {settle, [1], Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, three}, - {settle, [2], Cid} + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_settle(Cid, [1]), + make_settle(Cid, [0]), + make_enqueue(self(), 3, three), + make_settle(Cid, [2]) ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1550,13 +1659,13 @@ enq_check_settle_snapshot_purge_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {settle, [1], Cid}, - {settle, [0], Cid}, - {enqueue, self(), 3, three}, - purge + make_checkout(Cid, {auto, 2, simple_prefetch},#{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_settle(Cid, [1]), + make_settle(Cid, [0]), + make_enqueue(self(), 3, three), + make_purge() ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1566,38 +1675,18 @@ enq_check_settle_duplicate_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 2, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, %% 0 - {enqueue, self(), 2, two}, %% 0 - {settle, [0], Cid}, - {settle, [1], Cid}, - {settle, [1], Cid}, - {enqueue, self(), 3, three}, - {settle, [2], Cid} + make_checkout(Cid, {auto, 2, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), %% 0 + make_enqueue(self(), 2, two), %% 0 + make_settle(Cid, [0]), + make_settle(Cid, [1]), + make_settle(Cid, [1]), + make_enqueue(self(), 3, three), + make_settle(Cid, [2]) ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). - -multi_return_snapshot_test() -> - %% this was discovered using property testing - C1 = {<<>>, c:pid(0,6723,1)}, - C2 = {<<0>>,c:pid(0,6723,1)}, - E = c:pid(0,6720,1), - Commands = [ - {checkout,{auto,2,simple_prefetch},C1}, - {enqueue,E,1,msg}, - {enqueue,E,2,msg}, - {checkout,cancel,C1}, %% both on returns queue - {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 - {return,[0],C2}, %% E1 in returns, E2 with C2 - {return,[1],C2}, %% E2 in returns E1 with C2 - {settle,[2],C2} %% E2 with C2 - ], - run_snapshot_test(?FUNCTION_NAME, Commands), - ok. - - run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that @@ -1634,11 +1723,11 @@ delivery_query_returns_deliveries_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ - {checkout, {auto, 5, simple_prefetch}, Cid}, - {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {enqueue, self(), 3, tre}, - {enqueue, self(), 4, for} + make_checkout(Cid, {auto, 5, simple_prefetch}, #{}), + make_enqueue(self(), 1, one), + make_enqueue(self(), 2, two), + make_enqueue(self(), 3, tre), + make_enqueue(self(), 4, for) ], Indexes = lists:seq(1, length(Commands)), Entries = lists:zip(Indexes, Commands), @@ -1654,7 +1743,8 @@ pending_enqueue_is_enqueued_on_down_test() -> {State0, _} = enq(1, 2, first, test_init(test)), {State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0), {_State2, _, {dequeue, {0, {_, first}}}} = - apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1), + apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), + [], State1), ok. duplicate_delivery_test() -> @@ -1699,11 +1789,11 @@ state_enter_montors_and_notifications_test() -> purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), - {State2, _, {purge, 1}} = apply(meta(2), purge, [], State1), + {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1), {State3, _} = enq(3, 2, second, State2), % get returns a reply value {_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} = - apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3), + apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3), ok. purge_with_checkout_test() -> @@ -1711,7 +1801,7 @@ purge_with_checkout_test() -> {State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)), {State1, _} = enq(2, 1, first, State0), {State2, _} = enq(3, 2, second, State1), - {State3, _, {purge, 2}} = apply(meta(2), purge, [], State2), + {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), ?assertEqual(0, maps:size(Checked)), ok. @@ -1721,36 +1811,47 @@ meta(Idx) -> enq(Idx, MsgSeq, Msg, State) -> strip_reply( - apply(meta(Idx), {enqueue, self(), MsgSeq, Msg}, [], State)). + apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)). deq(Idx, Cid, Settlement, State0) -> {State, _, {dequeue, Msg}} = - apply(meta(Idx), {checkout, {dequeue, Settlement}, Cid}, [], State0), + apply(meta(Idx), + make_checkout(Cid, {dequeue, Settlement}, #{}), + [], State0), {State, Msg}. check_n(Cid, Idx, N, State) -> - strip_reply(apply(meta(Idx), - {checkout, {auto, N, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {auto, N, simple_prefetch}, #{}), + [], State)). check(Cid, Idx, State) -> - strip_reply(apply(meta(Idx), - {checkout, {once, 1, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {once, 1, simple_prefetch}, #{}), + [], State)). check_auto(Cid, Idx, State) -> - strip_reply(apply(meta(Idx), - {checkout, {auto, 1, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {auto, 1, simple_prefetch}, #{}), + [], State)). check(Cid, Idx, Num, State) -> - strip_reply(apply(meta(Idx), - {checkout, {once, Num, simple_prefetch}, Cid}, [], State)). + strip_reply( + apply(meta(Idx), + make_checkout(Cid, {once, Num, simple_prefetch}, #{}), + [], State)). settle(Cid, Idx, MsgId, State) -> - strip_reply(apply(meta(Idx), {settle, [MsgId], Cid}, [], State)). + strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)). credit(Cid, Idx, Credit, DelCnt, Drain, State) -> - strip_reply(apply(meta(Idx), {credit, Credit, DelCnt, Drain, Cid}, [], State)). + strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain), + [], State)). -strip_reply({State, Effects, _Replu}) -> +strip_reply({State, Effects, _Reply}) -> {State, Effects}. run_log(InitState, Entries) -> diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 635d85be4a..9cdb1dfbe7 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -24,8 +24,8 @@ init/2, init/3, init/5, - checkout/3, checkout/4, + checkout/5, cancel_checkout/2, enqueue/2, enqueue/3, @@ -42,6 +42,7 @@ ]). -include_lib("ra/include/ra.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -define(SOFT_LIMIT, 256). @@ -51,7 +52,7 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. --record(consumer, {last_msg_id :: seq(), +-record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). -record(state, {cluster_name :: ra_cluster_name(), @@ -98,7 +99,7 @@ init(ClusterName, Servers) -> %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. -spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). -init(ClusterName, Servers, SoftLimit) -> +init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, servers = Servers, @@ -107,7 +108,7 @@ init(ClusterName, Servers, SoftLimit) -> -spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). -init(ClusterName, Servers, SoftLimit, BlockFun, UnblockFun) -> +init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, servers = Servers, @@ -135,7 +136,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow, Node = pick_node(State0), {Next, State1} = next_enqueue_seq(State0), % by default there is no correlation id - Cmd = {enqueue, self(), Next, Msg}, + Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg), case send_command(Node, Correlation, Cmd, low, State1) of {slow, _} = Ret when not Slow -> BlockFun(), @@ -177,8 +178,11 @@ enqueue(Msg, State) -> dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> Node = pick_node(State0), ConsumerId = consumer_id(ConsumerTag), - case ra:process_command(Node, {checkout, {dequeue, Settlement}, - ConsumerId}, Timeout) of + case ra:process_command(Node, + rabbit_fifo:make_checkout(ConsumerId, + {dequeue, Settlement}, + #{}), + Timeout) of {ok, {dequeue, Reply}, Leader} -> {ok, Reply, State0#state{leader = Leader}}; Err -> @@ -198,7 +202,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) -> {ok, state()}. settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), - Cmd = {settle, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -232,7 +236,7 @@ settle(ConsumerTag, [_|_] = MsgIds, return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), % TODO: make rabbit_fifo return support lists of message ids - Cmd = {return, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -265,7 +269,7 @@ return(ConsumerTag, [_|_] = MsgIds, {ok | slow, state()}. discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) -> Node = pick_node(State0), - Cmd = {discard, MsgIds, consumer_id(ConsumerTag)}, + Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -299,9 +303,10 @@ discard(ConsumerTag, [_|_] = MsgIds, %% %% @returns `{ok, State}' or `{error | timeout, term()}' -spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), + rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, State0) -> - checkout(ConsumerTag, NumUnsettled, simple_prefetch, State0). +checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) -> + checkout(ConsumerTag, NumUnsettled, simple_prefetch, ConsumerInfo, State0). %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. @@ -319,13 +324,17 @@ checkout(ConsumerTag, NumUnsettled, State0) -> %% @param State The {@module} state. %% %% @returns `{ok, State}' or `{error | timeout, term()}' --spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(), +-spec checkout(rabbit_fifo:consumer_tag(), + NumUnsettled :: non_neg_integer(), CreditMode :: rabbit_fifo:credit_mode(), + Meta :: rabbit_fifo:consumer_meta(), state()) -> {ok, state()} | {error | timeout, term()}. -checkout(ConsumerTag, NumUnsettled, CreditMode, State0) -> +checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, - Cmd = {checkout, {auto, NumUnsettled, CreditMode}, ConsumerId}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, + {auto, NumUnsettled, CreditMode}, + Meta), try_process_command(Servers, Cmd, State0). %% @doc Provide credit to the queue @@ -348,8 +357,8 @@ credit(ConsumerTag, Credit, Drain, %% add one as it is 0 indexed C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}), Node = pick_node(State0), - Cmd = {credit, Credit, C#consumer.last_msg_id + 1, Drain, ConsumerId}, - ct:pal("sending credit ~w", [Cmd]), + Cmd = rabbit_fifo:make_credit(ConsumerId, Credit, + C#consumer.last_msg_id + 1, Drain), case send_command(Node, undefined, Cmd, normal, State0) of {slow, S} -> % turn slow into ok for this function @@ -372,7 +381,7 @@ credit(ConsumerTag, Credit, Drain, cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> Servers = sorted_servers(State0), ConsumerId = {ConsumerTag, self()}, - Cmd = {checkout, cancel, ConsumerId}, + Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}), State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)}, try_process_command(Servers, Cmd, State). @@ -380,7 +389,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) -> %% of messages purged. -spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}. purge(Node) -> - case ra:process_command(Node, purge) of + case ra:process_command(Node, rabbit_fifo:make_purge()) of {ok, {purge, Reply}, _} -> {ok, Reply}; Err -> @@ -393,7 +402,7 @@ cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. update_machine_state(Node, Conf) -> - case ra:process_command(Node, {update_state, Conf}) of + case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of {ok, ok, _} -> ok; Err -> @@ -520,7 +529,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) -> -spec untracked_enqueue([ra_server_id()], term()) -> ok. untracked_enqueue([Node | _], Msg) -> - Cmd = {enqueue, undefined, undefined, Msg}, + Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg), ok = ra:pipeline_command(Node, Cmd), ok. diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 47b8a6f643..bb8af13b9d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -19,7 +19,7 @@ -export([init_state/2, handle_event/2]). -export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]). -export([info/1, info/2, stat/1, infos/1]). --export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]). +-export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]). -export([credit/4]). -export([purge/1]). -export([stateless_deliver/2, deliver/3]). @@ -47,12 +47,9 @@ -spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) -> {'internal', Correlators :: [term()], rabbit_fifo_client:state()} | {rabbit_fifo:client_msg(), rabbit_fifo_client:state()}. --spec declare(rabbit_types:amqqueue()) -> {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()}. -spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() | {'absent', rabbit_types:amqqueue(), atom()}]. -spec stop(rabbit_types:vhost()) -> 'ok'. --spec delete(rabbit_types:amqqueue(), boolean(), boolean(), rabbit_types:username()) -> - {'ok', QLen :: non_neg_integer()}. -spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> @@ -61,15 +58,9 @@ rabbit_fifo_client:state()) -> {'ok', 'empty', rabbit_fifo_client:state()} | {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. --spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(), - ConsumerPrefetchCount :: non_neg_integer(), rabbit_types:ctag(), - ExclusiveConsume :: boolean(), Args :: rabbit_framing:amqp_table(), - any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. --spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) -> - rabbit_fifo_client:state(). -spec info(rabbit_types:amqqueue()) -> rabbit_types:infos(). -spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos(). -spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). @@ -95,7 +86,7 @@ -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> rabbit_fifo_client:state(). -init_state({Name, _}, QName) -> +init_state({Name, _}, QName = #resource{}) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), %% This lookup could potentially return an {error, not_found}, but we do not %% know what to do if the queue has `disappeared`. Let it crash. @@ -104,13 +95,16 @@ init_state({Name, _}, QName) -> %% Ensure the leader is listed first Servers0 = [{Name, N} || N <- Nodes], Servers = [Leader | lists:delete(Leader, Servers0)], - rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, + rabbit_fifo_client:init(QName, Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). handle_event({ra_event, From, Evt}, QState) -> rabbit_fifo_client:handle_ra_event(From, Evt, QState). +-spec declare(rabbit_types:amqqueue()) -> + {'new', rabbit_types:amqqueue()} | + {existing, rabbit_types:amqqueue()}. declare(#amqqueue{name = QName, durable = Durable, auto_delete = AutoDelete, @@ -244,9 +238,9 @@ recover(Queues) -> RaNodes = [{Name, Node} || Node <- Nodes], case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of ok -> ok; - Err -> + Err2 -> rabbit_log:warning("recover: quorum queue ~w could not" - " be started ~w", [Name, Err]), + " be started ~w", [Name, Err2]), ok end; {error, {already_started, _}} -> @@ -272,7 +266,12 @@ stop(VHost) -> _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], ok. -delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, +-spec delete(rabbit_types:amqqueue(), + boolean(), boolean(), + rabbit_types:username()) -> + {ok, QLen :: non_neg_integer()}. +delete(#amqqueue{type = quorum, pid = {Name, _}, + name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -342,17 +341,35 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, {error, timeout} end. +-spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(), + ConsumerPrefetchCount :: non_neg_integer(), + rabbit_types:ctag(), ExclusiveConsume :: boolean(), + Args :: rabbit_framing:amqp_table(), ActingUser :: binary(), + any(), rabbit_fifo_client:state()) -> + {'ok', rabbit_fifo_client:state()}. basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid, - ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg, - QState0) -> + ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args, + ActingUser, OkMsg, QState0) -> + %% TODO: validate consumer arguments + %% currently quorum queues do not support any arguments maybe_send_reply(ChPid, OkMsg), - %% A prefetch count of 0 means no limitation, let's make it into something large for ra + ConsumerTag = quorum_ctag(ConsumerTag0), + %% A prefetch count of 0 means no limitation, + %% let's make it into something large for ra Prefetch = case ConsumerPrefetchCount of 0 -> 2000; Other -> Other end, - {ok, QState} = rabbit_fifo_client:checkout(quorum_ctag(ConsumerTag), - Prefetch, QState0), + %% consumer info is used to describe the consumer properties + ConsumerMeta = #{ack => not NoAck, + prefetch => ConsumerPrefetchCount, + args => Args, + username => ActingUser}, + {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag, + Prefetch, + ConsumerMeta, + QState0), + %% TODO: emit as rabbit_fifo effect rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume, not NoAck, QName, ConsumerPrefetchCount, Args), @@ -366,6 +383,9 @@ stateless_deliver(ServerId, Delivery) -> ok = rabbit_fifo_client:untracked_enqueue([ServerId], Delivery#delivery.message). +-spec deliver(Confirm :: boolean(), rabbit_types:delivery(), + rabbit_fifo_client:state()) -> + {ok | slow, rabbit_fifo_client:state()}. deliver(false, Delivery, QState0) -> rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0); deliver(true, Delivery, QState0) -> @@ -401,8 +421,8 @@ cleanup_data_dir() -> <- rabbit_amqqueue:list_by_type(quorum), lists:member(node(), Nodes)], Registered = ra_directory:list_registered(), - [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, - not lists:member(Name, Names)], + _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered, + not lists:member(Name, Names)], ok. maybe_delete_data_dir(UId) -> |
