summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-12-14 10:10:29 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-14 10:10:29 +0000
commit46b6c2f303846e218964857e033ee730cf87bd66 (patch)
treecd819a9f0b736b2f819eae113e45f3d1c9a4b72a /src
parent0a21df17d97ead045bedd42bf2fd867687eeec17 (diff)
downloadrabbitmq-server-git-46b6c2f303846e218964857e033ee730cf87bd66.tar.gz
Implement consumer listing for quorum queues
Refactor rabbit_fifo internal commands to use records instead of plain tuples to provide a little bit more compile time safety and make it easier to extend in the future. [#162584074]
Diffstat (limited to 'src')
-rw-r--r--src/lqueue.erl39
-rw-r--r--src/rabbit_amqqueue.erl29
-rw-r--r--src/rabbit_fifo.erl433
-rw-r--r--src/rabbit_fifo_client.erl51
-rw-r--r--src/rabbit_quorum_queue.erl64
5 files changed, 379 insertions, 237 deletions
diff --git a/src/lqueue.erl b/src/lqueue.erl
index 1abe4e0b82..2b75ef4856 100644
--- a/src/lqueue.erl
+++ b/src/lqueue.erl
@@ -25,27 +25,32 @@
-define(QUEUE, queue).
--export_type([?MODULE/0]).
+-export_type([
+ ?MODULE/0,
+ ?MODULE/1
+ ]).
--opaque ?MODULE() :: {non_neg_integer(), queue:queue()}.
+-opaque ?MODULE() :: {non_neg_integer(), queue:queue(term())}.
+-opaque ?MODULE(T) :: {non_neg_integer(), queue:queue(T)}.
-type value() :: any().
--type result() :: 'empty' | {'value', value()}.
+-type result(T) :: 'empty' | {'value', T}.
--spec new() -> ?MODULE().
--spec drop(?MODULE()) -> ?MODULE().
--spec is_empty(?MODULE()) -> boolean().
--spec len(?MODULE()) -> non_neg_integer().
--spec in(value(), ?MODULE()) -> ?MODULE().
+-spec new() -> ?MODULE(_).
+-spec drop(?MODULE(T)) -> ?MODULE(T).
+-spec is_empty(?MODULE(_)) -> boolean().
+-spec len(?MODULE(_)) -> non_neg_integer().
+-spec in(T, ?MODULE(T)) -> ?MODULE(T).
-spec in_r(value(), ?MODULE()) -> ?MODULE().
--spec out(?MODULE()) -> {result(), ?MODULE()}.
--spec out_r(?MODULE()) -> {result(), ?MODULE()}.
--spec join(?MODULE(), ?MODULE()) -> ?MODULE().
--spec foldl(fun ((value(), B) -> B), B, ?MODULE()) -> B.
--spec foldr(fun ((value(), B) -> B), B, ?MODULE()) -> B.
--spec from_list([value()]) -> ?MODULE().
--spec to_list(?MODULE()) -> [value()].
--spec peek(?MODULE()) -> result().
--spec peek_r(?MODULE()) -> result().
+-spec out(?MODULE(T)) -> {result(T), ?MODULE()}.
+-spec out_r(?MODULE(T)) -> {result(T), ?MODULE()}.
+-spec join(?MODULE(A), ?MODULE(B)) -> ?MODULE(A | B).
+-spec foldl(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+-spec foldr(fun ((T, B) -> B), B, ?MODULE(T)) -> B.
+-spec from_list([T]) -> ?MODULE(T).
+-spec to_list(?MODULE(T)) -> [T].
+% -spec peek(?MODULE()) -> result().
+-spec peek(?MODULE(T)) -> result(T).
+-spec peek_r(?MODULE(T)) -> result(T).
new() -> {0, ?QUEUE:new()}.
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 093d118a6b..016443bb73 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -919,8 +919,12 @@ notify_policy_changed(#amqqueue{pid = QPid,
name = QName}) when ?IS_QUORUM(QPid) ->
rabbit_quorum_queue:policy_changed(QName, QPid).
-consumers(#amqqueue{ pid = QPid }) ->
- delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}).
+consumers(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) ->
+ delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]});
+consumers(#amqqueue{pid = QPid}) when ?IS_QUORUM(QPid) ->
+ {ok, {_, Result}, _} = ra:local_query(QPid,
+ fun rabbit_fifo:query_consumers/1),
+ maps:values(Result).
consumer_info_keys() -> ?CONSUMER_INFO_KEYS.
@@ -1147,14 +1151,15 @@ basic_get(#amqqueue{pid = {Name, _} = Id, type = quorum, name = QName} = Q, _ChP
[rabbit_misc:rs(QName), Reason])
end.
-basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid, LimiterPid,
- LimiterActive, ConsumerPrefetchCount, ConsumerTag,
+basic_consume(#amqqueue{pid = QPid, name = QName, type = classic}, NoAck, ChPid,
+ LimiterPid, LimiterActive, ConsumerPrefetchCount, ConsumerTag,
ExclusiveConsume, Args, OkMsg, ActingUser, QState) ->
ok = check_consume_arguments(QName, Args),
- case delegate:invoke(QPid, {gen_server2, call,
- [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
- Args, OkMsg, ActingUser}, infinity]}) of
+ case delegate:invoke(QPid,
+ {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid, LimiterActive,
+ ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume,
+ Args, OkMsg, ActingUser}, infinity]}) of
ok ->
{ok, QState};
Err ->
@@ -1164,15 +1169,17 @@ basic_consume(#amqqueue{type = quorum}, _NoAck, _ChPid,
_LimiterPid, true, _ConsumerPrefetchCount, _ConsumerTag,
_ExclusiveConsume, _Args, _OkMsg, _ActingUser, _QStates) ->
{error, global_qos_not_supported_for_queue_type};
-basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q, NoAck, ChPid,
- _LimiterPid, _LimiterActive, ConsumerPrefetchCount, ConsumerTag,
- ExclusiveConsume, Args, OkMsg, _ActingUser, QStates) ->
+basic_consume(#amqqueue{pid = {Name, _} = Id, name = QName, type = quorum} = Q,
+ NoAck, ChPid, _LimiterPid, _LimiterActive, ConsumerPrefetchCount,
+ ConsumerTag, ExclusiveConsume, Args, OkMsg,
+ ActingUser, QStates) ->
ok = check_consume_arguments(QName, Args),
QState0 = get_quorum_state(Id, QName, QStates),
{ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
ConsumerPrefetchCount,
ConsumerTag,
ExclusiveConsume, Args,
+ ActingUser,
OkMsg, QState0),
{ok, maps:put(Name, QState, QStates)}.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 1f047027b9..2d5c267227 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -39,10 +39,21 @@
query_processes/1,
query_ra_indexes/1,
query_consumer_count/1,
+ query_consumers/1,
usage/1,
%% misc
- dehydrate_state/1
+ dehydrate_state/1,
+
+ %% protocol helpers
+ make_enqueue/3,
+ make_checkout/3,
+ make_settle/2,
+ make_return/2,
+ make_discard/2,
+ make_credit/4,
+ make_purge/0,
+ make_update_state/1
]).
-type raw_msg() :: term().
@@ -93,19 +104,43 @@
{dequeue, settled | unsettled} |
cancel.
--type protocol() ::
- {enqueue, Sender :: maybe(pid()), MsgSeq :: maybe(msg_seqno()),
- Msg :: raw_msg()} |
- {checkout, Spec :: checkout_spec(), Consumer :: consumer_id()} |
- {settle, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {return, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {discard, MsgIds :: [msg_id()], Consumer :: consumer_id()} |
- {credit,
- Credit :: non_neg_integer(),
- DeliveryCount :: non_neg_integer(),
- Drain :: boolean(),
- Consumer :: consumer_id()} |
- purge.
+-type consumer_meta() :: #{ack => boolean(),
+ username => binary(),
+ prefetch => non_neg_integer(),
+ args => list()}.
+%% static meta data associated with a consumer
+
+%% command records representing all the protocol actions that are supported
+-record(enqueue, {pid :: maybe(pid()),
+ seq :: maybe(msg_seqno()),
+ msg :: raw_msg()}).
+-record(checkout, {consumer_id :: consumer_id(),
+ spec :: checkout_spec(),
+ meta :: consumer_meta()}).
+-record(settle, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(return, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(discard, {consumer_id :: consumer_id(),
+ msg_ids :: [msg_id()]}).
+-record(credit, {consumer_id :: consumer_id(),
+ credit :: non_neg_integer(),
+ delivery_count :: non_neg_integer(),
+ drain :: boolean()}).
+-record(purge, {}).
+-record(update_state, {config :: config()}).
+
+
+
+-opaque protocol() ::
+ #enqueue{} |
+ #checkout{} |
+ #settle{} |
+ #return{} |
+ #discard{} |
+ #credit{} |
+ #purge{} |
+ #update_state{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -120,7 +155,8 @@
-define(USE_AVG_HALF_LIFE, 10000.0).
-record(consumer,
- {checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
+ {meta = #{} :: consumer_meta(),
+ checked_out = #{} :: #{msg_id() => {msg_in_id(), indexed_msg()}},
next_msg_id = 0 :: msg_id(), % part of snapshot data
%% max number of messages that can be sent
%% decremented for each delivery
@@ -158,7 +194,7 @@
next_msg_num = 1 :: msg_in_id(),
% list of returned msg_in_ids - when checking out it picks from
% this list first before taking low_msg_num
- returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'),
+ returns = lqueue:new() :: lqueue:lqueue('$prefix_msg' | msg_in_id()),
% a counter of enqueues - used to trigger shadow copy points
enqueue_count = 0 :: non_neg_integer(),
% a map containing all the live processes that have ever enqueued
@@ -196,6 +232,7 @@
-opaque state() :: #state{}.
-type config() :: #{name := atom(),
+ queue_resource := rabbit_types:r('queue'),
dead_letter_handler => applied_mfa(),
become_leader_handler => applied_mfa(),
shadow_copy_interval => non_neg_integer()}.
@@ -203,7 +240,9 @@
-export_type([protocol/0,
delivery/0,
command/0,
+ credit_mode/0,
consumer_tag/0,
+ consumer_meta/0,
consumer_id/0,
client_msg/0,
msg/0,
@@ -213,7 +252,7 @@
state/0,
config/0]).
--spec init(config()) -> {state(), ra_machine:effects()}.
+-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
update_state(Conf, #state{name = Name,
@@ -232,7 +271,8 @@ update_state(Conf, State) ->
-spec apply(ra_machine:command_meta_data(), command(),
ra_machine:effects(), state()) ->
{state(), ra_machine:effects(), Reply :: term()}.
-apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
+apply(#{index := RaftIdx}, #enqueue{pid = From, seq = Seq,
+ msg = RawMsg}, Effects0, State00) ->
case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of
{ok, State0, Effects1} ->
{State, Effects, ok} = checkout(State0, Effects1),
@@ -240,7 +280,8 @@ apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) ->
{duplicate, State, Effects} ->
{State, Effects, ok}
end;
-apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0,
+apply(#{index := RaftIdx},
+ #settle{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0} ->
@@ -251,8 +292,8 @@ apply(#{index := RaftIdx}, {settle, MsgIds, ConsumerId}, Effects0,
_ ->
{State, Effects0, ok}
end;
-apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0,
- #state{consumers = Cons0} = State0) ->
+apply(#{index := RaftIdx}, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
+ Effects0, #state{consumers = Cons0} = State0) ->
case Cons0 of
#{ConsumerId := Con0} ->
{State, Effects, Res} = complete_and_checkout(RaftIdx, MsgIds,
@@ -263,7 +304,7 @@ apply(#{index := RaftIdx}, {discard, MsgIds, ConsumerId}, Effects0,
_ ->
{State0, Effects0, ok}
end;
-apply(_, {return, MsgIds, ConsumerId}, Effects0,
+apply(_, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0} = State) ->
case Cons0 of
#{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
@@ -274,7 +315,8 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0,
_ ->
{State, Effects0, ok}
end;
-apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
+apply(_, #credit{credit = NewCredit, delivery_count = RemoteDelCnt,
+ drain = Drain, consumer_id = ConsumerId}, Effects0,
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
case Cons0 of
@@ -317,25 +359,29 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0,
%% credit for unknown consumer - just ignore
{State0, Effects0, ok}
end;
-apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0,
+apply(_, #checkout{spec = {dequeue, _}}, Effects0,
#state{messages = M,
prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 ->
%% FIXME: also check if there are returned messages
%% TODO do we need metric visibility of empty get requests?
{State0, Effects0, {dequeue, empty}};
-apply(Meta, {checkout, {dequeue, settled}, ConsumerId},
+apply(Meta, #checkout{spec = {dequeue, settled}, meta = ConsumerMeta,
+ consumer_id = ConsumerId},
Effects0, State0) ->
% TODO: this clause could probably be optimised
- State1 = update_consumer(ConsumerId, {once, 1, simple_prefetch}, State0),
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
% turn send msg effect into reply
{success, _, MsgId, Msg, State2} = checkout_one(State1),
% immediately settle
- {State, Effects, _} = apply(Meta, {settle, [MsgId], ConsumerId},
+ {State, Effects, _} = apply(Meta, make_settle(ConsumerId, [MsgId]),
Effects0, State2),
{State, Effects, {dequeue, {MsgId, Msg}}};
-apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer},
+apply(_, #checkout{spec = {dequeue, unsettled},
+ meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId},
Effects0, State0) ->
- State1 = update_consumer(Consumer, {once, 1, simple_prefetch}, State0),
+ State1 = update_consumer(ConsumerId, ConsumerMeta,
+ {once, 1, simple_prefetch}, State0),
Effects1 = [{monitor, process, Pid} | Effects0],
{State, Reply, Effects} = case checkout_one(State1) of
{success, _, MsgId, Msg, S} ->
@@ -346,16 +392,17 @@ apply(_, {checkout, {dequeue, unsettled}, {_Tag, Pid} = Consumer},
{S, empty, Effects1}
end,
{State, Effects, {dequeue, Reply}};
-apply(_, {checkout, cancel, ConsumerId}, Effects0, State0) ->
+apply(_, #checkout{spec = cancel, consumer_id = ConsumerId}, Effects0, State0) ->
{CancelEffects, State1} = cancel_consumer(ConsumerId, {Effects0, State0}),
% TODO: here we should really demonitor the pid but _only_ if it has no
% other consumers or enqueuers.
checkout(State1, CancelEffects);
-apply(_, {checkout, Spec, {_Tag, Pid} = ConsumerId}, Effects0, State0) ->
- State1 = update_consumer(ConsumerId, Spec, State0),
+apply(_, #checkout{spec = Spec, meta = Meta, consumer_id = {_, Pid} = ConsumerId},
+ Effects0, State0) ->
+ State1 = update_consumer(ConsumerId, Meta, Spec, State0),
{State, Effects, Res} = checkout(State1, Effects0),
{State, [{monitor, process, Pid} | Effects], Res};
-apply(#{index := RaftIdx}, purge, Effects0,
+apply(#{index := RaftIdx}, #purge{}, Effects0,
#state{consumers = Cons0, ra_indexes = Indexes } = State0) ->
Total = rabbit_fifo_index:size(Indexes),
{State1, Effects1, _} =
@@ -459,7 +506,7 @@ apply(_, {nodeup, Node}, Effects0,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok};
-apply(_, {update_state, Conf}, Effects, State) ->
+apply(_, #update_state{config = Conf}, Effects, State) ->
{update_state(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
@@ -567,6 +614,14 @@ query_ra_indexes(#state{ra_indexes = RaIndexes}) ->
query_consumer_count(#state{consumers = Consumers}) ->
maps:size(Consumers).
+query_consumers(#state{consumers = Consumers}) ->
+ maps:map(fun ({Tag, Pid}, #consumer{meta = Meta}) ->
+ {Pid, Tag,
+ maps:get(ack, Meta, undefined),
+ maps:get(prefetch, Meta, undefined),
+ maps:get(args, Meta, []),
+ maps:get(username, Meta, undefined)}
+ end, Consumers).
%% other
-spec usage(atom()) -> float().
@@ -998,11 +1053,12 @@ uniq_queue_in(Key, Queue) ->
end.
-update_consumer(ConsumerId, {Life, Credit, Mode},
+update_consumer(ConsumerId, Meta, {Life, Credit, Mode},
#state{consumers = Cons0,
service_queue = ServiceQueue0} = State0) ->
%% TODO: this logic may not be correct for updating a pre-existing consumer
- Init = #consumer{lifetime = Life, credit = Credit, credit_mode = Mode},
+ Init = #consumer{lifetime = Life, meta = Meta,
+ credit = Credit, credit_mode = Mode},
Cons = maps:update_with(ConsumerId,
fun(S) ->
%% remove any in-flight messages from
@@ -1055,6 +1111,41 @@ dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0),
Con#consumer{checked_out = Checked}.
+-spec make_enqueue(maybe(pid()), maybe(msg_seqno()), raw_msg()) -> protocol().
+make_enqueue(Pid, Seq, Msg) ->
+ #enqueue{pid = Pid, seq = Seq, msg = Msg}.
+-spec make_checkout(consumer_id(),
+ checkout_spec(), consumer_meta()) -> protocol().
+make_checkout(ConsumerId, Spec, Meta) ->
+ #checkout{consumer_id = ConsumerId,
+ spec = Spec, meta = Meta}.
+
+-spec make_settle(consumer_id(), [msg_id()]) -> protocol().
+make_settle(ConsumerId, MsgIds) ->
+ #settle{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_return(consumer_id(), [msg_id()]) -> protocol().
+make_return(ConsumerId, MsgIds) ->
+ #return{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_discard(consumer_id(), [msg_id()]) -> protocol().
+make_discard(ConsumerId, MsgIds) ->
+ #discard{consumer_id = ConsumerId, msg_ids = MsgIds}.
+
+-spec make_credit(consumer_id(), non_neg_integer(), non_neg_integer(),
+ boolean()) -> protocol().
+make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
+ #credit{consumer_id = ConsumerId,
+ credit = Credit,
+ delivery_count = DeliveryCount,
+ drain = Drain}.
+
+-spec make_purge() -> protocol().
+make_purge() -> #purge{}.
+
+-spec make_update_state(config()) -> protocol().
+make_update_state(Config) ->
+ #update_state{config = Config}.
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
@@ -1087,7 +1178,9 @@ enq_enq_checkout_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
{_State3, Effects, _} =
- apply(meta(3), {checkout, {once, 2, simple_prefetch}, Cid}, [], State2),
+ apply(meta(3),
+ make_checkout(Cid, {once, 2, simple_prefetch}, #{}),
+ [], State2),
?ASSERT_EFF({monitor, _, _}, Effects),
?ASSERT_EFF({send_msg, _, {delivery, _, _}, _}, Effects),
ok.
@@ -1097,7 +1190,8 @@ credit_enq_enq_checkout_settled_credit_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
{State2, _} = enq(2, 2, second, State1),
{State3, Effects, _} =
- apply(meta(3), {checkout, {auto, 1, credited}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {auto, 1, credited}, #{}),
+ [], State2),
?ASSERT_EFF({monitor, _, _}, Effects),
Deliveries = lists:filter(fun ({send_msg, _, {delivery, _, _}, _}) -> true;
(_) -> false
@@ -1127,12 +1221,13 @@ credit_with_drained_test() ->
State0 = test_init(test),
%% checkout with a single credit
{State1, _, _} =
- apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0),
+ apply(meta(1), make_checkout(Cid, {auto, 1, credited},#{}),
+ [], State0),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
delivery_count = 0}}},
State1),
{State, _Effs, Result} =
- apply(meta(3), {credit, 0, 5, true, Cid}, [], State1),
+ apply(meta(3), make_credit(Cid, 0, 5, true), [], State1),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
@@ -1147,12 +1242,13 @@ credit_and_drain_test() ->
{State2, _} = enq(2, 2, second, State1),
%% checkout without any initial credit (like AMQP 1.0 would)
{State3, CheckEffs, _} =
- apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {auto, 0, credited}, #{}),
+ [], State2),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
{State4, Effects, {multi, [{send_credit_reply, 0},
{send_drained, [{?FUNCTION_NAME, 2}]}]}} =
- apply(meta(4), {credit, 4, 0, true, Cid}, [], State3),
+ apply(meta(4), make_credit(Cid, 4, 0, true), [], State3),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
@@ -1171,7 +1267,8 @@ enq_enq_deq_test() ->
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
{_State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State2),
ok.
enq_enq_deq_deq_settle_test() ->
@@ -1180,9 +1277,11 @@ enq_enq_deq_deq_settle_test() ->
{State2, _} = enq(2, 2, second, State1),
% get returns a reply value
{State3, [{monitor, _, _}], {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, unsettled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State2),
{_State4, _Effects4, {dequeue, empty}} =
- apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State3),
ok.
enq_enq_checkout_get_settled_test() ->
@@ -1190,23 +1289,28 @@ enq_enq_checkout_get_settled_test() ->
{State1, _} = enq(1, 1, first, test_init(test)),
% get returns a reply value
{_State2, _Effects, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
checkout_get_empty_test() ->
Cid = {?FUNCTION_NAME, self()},
State = test_init(test),
{_State2, [], {dequeue, empty}} =
- apply(meta(1), {checkout, {dequeue, unsettled}, Cid}, [], State),
+ apply(meta(1), make_checkout(Cid, {dequeue, unsettled}, #{}),
+ [], State),
ok.
untracked_enq_deq_test() ->
Cid = {?FUNCTION_NAME, self()},
State0 = test_init(test),
- {State1, _, _} = apply(meta(1), {enqueue, undefined, undefined, first}, [], State0),
+ {State1, _, _} = apply(meta(1),
+ make_enqueue(undefined, undefined, first), [], State0),
{_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
+
release_cursor_test() ->
Cid = {?FUNCTION_NAME, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
@@ -1277,7 +1381,7 @@ return_non_existent_test() ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
% return non-existent
- {_State2, [], _} = apply(meta(3), {return, [99], Cid}, [], State0),
+ {_State2, [], _} = apply(meta(3), make_return(Cid, [99]), [], State0),
ok.
return_checked_out_test() ->
@@ -1287,7 +1391,8 @@ return_checked_out_test() ->
{send_msg, _, {delivery, _, [{MsgId, _}]}, _}]} =
check(Cid, 2, State0),
% return
- {_State2, [_, _], _} = apply(meta(3), {return, [MsgId], Cid}, [], State1),
+ {_State2, [_, _], _} = apply(meta(3), make_return(Cid, [MsgId]),
+ [], State1),
ok.
return_auto_checked_out_test() ->
@@ -1300,7 +1405,7 @@ return_auto_checked_out_test() ->
{send_msg, _, {delivery, _, [{MsgId, _}]}, _} | _]} =
check_auto(Cid, 2, State0),
% return should include another delivery
- {_State2, Effects, _} = apply(meta(3), {return, [MsgId], Cid}, [], State1),
+ {_State2, Effects, _} = apply(meta(3), make_return(Cid, [MsgId]), [], State1),
?ASSERT_EFF({send_msg, _,
{delivery, _, [{_, {#{delivery_count := 1}, first}}]}, _},
Effects),
@@ -1313,12 +1418,15 @@ cancelled_checkout_out_test() ->
{State0, [_]} = enq(2, 2, second, State00),
{State1, _} = check_auto(Cid, 2, State0),
% cancelled checkout should return all pending messages to queue
- {State2, _, _} = apply(meta(3), {checkout, cancel, Cid}, [], State1),
+ {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}),
+ [], State1),
+ ?assertEqual(2, maps:size(State2#state.messages)),
{State3, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State2),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), [], State2),
+ ?debugFmt("State3 ~p", [State3]),
{_State, _, {dequeue, {_, {_, second}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), [], State3),
ok.
down_with_noproc_consumer_returns_unsettled_test() ->
@@ -1394,7 +1502,7 @@ discarded_message_without_dead_letter_handler_is_removed_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1),
+ {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]), [], State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -1411,7 +1519,8 @@ discarded_message_with_dead_letter_handler_emits_mod_call_effect_test() ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, Effects2, _} = apply(meta(1), {discard, [0], Cid}, [], State1),
+ {_State2, Effects2, _} = apply(meta(1), make_discard(Cid, [0]),
+ [], State1),
% assert mod call effect with appended reason and message
?ASSERT_EFF({mod_call, somemod, somefun, [somearg, [{rejected, first}]]},
Effects2),
@@ -1424,7 +1533,7 @@ tick_test() ->
{S1, _} = enq(2, 2, snd, S0),
{S2, {MsgId, _}} = deq(3, Cid, unsettled, S1),
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
- {S4, _, _} = apply(meta(5), {return, [MsgId], Cid}, [], S3),
+ {S4, _, _} = apply(meta(5), make_return(Cid, [MsgId]), [], S3),
[{mod_call, _, _, [_, {test, 1, 1, 2, 1}]}, {aux, emit}] = tick(1, S4),
ok.
@@ -1433,14 +1542,14 @@ enq_deq_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, settled}, Cid},
- {enqueue, self(), 3, three},
- {enqueue, self(), 4, four},
- {checkout, {dequeue, settled}, Cid},
- {enqueue, self(), 5, five},
- {checkout, {dequeue, settled}, Cid}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, settled}, #{}),
+ make_enqueue(self(), 3, three),
+ make_enqueue(self(), 4, four),
+ make_checkout(Cid, {dequeue, settled}, #{}),
+ make_enqueue(self(), 5, five),
+ make_checkout(Cid, {dequeue, settled}, #{})
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1450,10 +1559,10 @@ enq_deq_settle_snapshot_recover_test() ->
% OthPid = spawn(fun () -> ok end),
% Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Cid}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Cid, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1463,13 +1572,13 @@ enq_deq_settle_snapshot_recover_2_test() ->
OthPid = spawn(fun () -> ok end),
Oth = {<<"oth">>, OthPid},
Commands = [
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {checkout, {dequeue, unsettled}, Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, two},
- {checkout, {dequeue, unsettled}, Oth},
- {settle, [0], Oth}
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, two),
+ make_checkout(Cid, {dequeue, unsettled}, #{}),
+ make_settle(Oth, [0])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1477,11 +1586,11 @@ snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {enqueue, self(), 3, three},
- purge
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_enqueue(self(), 3, three),
+ make_purge()
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1489,12 +1598,12 @@ enq_deq_return_settle_snapshot_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one}, %% to Cid
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {return, [0], Cid}, %% should be re-delivered to Cid
- {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
- {settle, [1], Cid},
- {settle, [2], Cid}
+ make_enqueue(self(), 1, one), %% to Cid
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_return(Cid, [0]), %% should be re-delivered to Cid
+ make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
+ make_settle(Cid, [1]),
+ make_settle(Cid, [2])
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1502,10 +1611,10 @@ return_prefix_msg_count_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one},
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {checkout, cancel, Cid},
- {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2
+ make_enqueue(self(), 1, one),
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_checkout(Cid, cancel, #{}),
+ make_enqueue(self(), 2, two) %% Cid prefix_msg_count: 2
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
@@ -1518,16 +1627,16 @@ return_settle_snapshot_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {enqueue, self(), 1, one}, %% to Cid
- {checkout, {auto, 1, simple_prefetch}, Cid},
- {return, [0], Cid}, %% should be re-delivered to Oth
- {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2
- {settle, [1], Cid},
- {return, [2], Cid},
- {settle, [3], Cid},
- {enqueue, self(), 3, three},
- purge,
- {enqueue, self(), 4, four}
+ make_enqueue(self(), 1, one), %% to Cid
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ make_return(Cid, [0]), %% should be re-delivered to Oth
+ make_enqueue(self(), 2, two), %% Cid prefix_msg_count: 2
+ make_settle(Cid, [1]),
+ make_return(Cid, [2]),
+ make_settle(Cid, [3]),
+ make_enqueue(self(), 3, three),
+ make_purge(),
+ make_enqueue(self(), 4, four)
],
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1535,13 +1644,13 @@ enq_check_settle_snapshot_recover_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {settle, [1], Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, three},
- {settle, [2], Cid}
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, three),
+ make_settle(Cid, [2])
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1550,13 +1659,13 @@ enq_check_settle_snapshot_purge_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {settle, [1], Cid},
- {settle, [0], Cid},
- {enqueue, self(), 3, three},
- purge
+ make_checkout(Cid, {auto, 2, simple_prefetch},#{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [0]),
+ make_enqueue(self(), 3, three),
+ make_purge()
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
@@ -1566,38 +1675,18 @@ enq_check_settle_duplicate_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 2, simple_prefetch}, Cid},
- {enqueue, self(), 1, one}, %% 0
- {enqueue, self(), 2, two}, %% 0
- {settle, [0], Cid},
- {settle, [1], Cid},
- {settle, [1], Cid},
- {enqueue, self(), 3, three},
- {settle, [2], Cid}
+ make_checkout(Cid, {auto, 2, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one), %% 0
+ make_enqueue(self(), 2, two), %% 0
+ make_settle(Cid, [0]),
+ make_settle(Cid, [1]),
+ make_settle(Cid, [1]),
+ make_enqueue(self(), 3, three),
+ make_settle(Cid, [2])
],
% ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]),
run_snapshot_test(?FUNCTION_NAME, Commands).
-
-multi_return_snapshot_test() ->
- %% this was discovered using property testing
- C1 = {<<>>, c:pid(0,6723,1)},
- C2 = {<<0>>,c:pid(0,6723,1)},
- E = c:pid(0,6720,1),
- Commands = [
- {checkout,{auto,2,simple_prefetch},C1},
- {enqueue,E,1,msg},
- {enqueue,E,2,msg},
- {checkout,cancel,C1}, %% both on returns queue
- {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2
- {return,[0],C2}, %% E1 in returns, E2 with C2
- {return,[1],C2}, %% E2 in returns E1 with C2
- {settle,[2],C2} %% E2 with C2
- ],
- run_snapshot_test(?FUNCTION_NAME, Commands),
- ok.
-
-
run_snapshot_test(Name, Commands) ->
%% create every incremental permuation of the commands lists
%% and run the snapshot tests against that
@@ -1634,11 +1723,11 @@ delivery_query_returns_deliveries_test() ->
Tag = atom_to_binary(?FUNCTION_NAME, utf8),
Cid = {Tag, self()},
Commands = [
- {checkout, {auto, 5, simple_prefetch}, Cid},
- {enqueue, self(), 1, one},
- {enqueue, self(), 2, two},
- {enqueue, self(), 3, tre},
- {enqueue, self(), 4, for}
+ make_checkout(Cid, {auto, 5, simple_prefetch}, #{}),
+ make_enqueue(self(), 1, one),
+ make_enqueue(self(), 2, two),
+ make_enqueue(self(), 3, tre),
+ make_enqueue(self(), 4, for)
],
Indexes = lists:seq(1, length(Commands)),
Entries = lists:zip(Indexes, Commands),
@@ -1654,7 +1743,8 @@ pending_enqueue_is_enqueued_on_down_test() ->
{State0, _} = enq(1, 2, first, test_init(test)),
{State1, _, _} = apply(meta(2), {down, Pid, noproc}, [], State0),
{_State2, _, {dequeue, {0, {_, first}}}} =
- apply(meta(3), {checkout, {dequeue, settled}, Cid}, [], State1),
+ apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}),
+ [], State1),
ok.
duplicate_delivery_test() ->
@@ -1699,11 +1789,11 @@ state_enter_montors_and_notifications_test() ->
purge_test() ->
Cid = {<<"purge_test">>, self()},
{State1, _} = enq(1, 1, first, test_init(test)),
- {State2, _, {purge, 1}} = apply(meta(2), purge, [], State1),
+ {State2, _, {purge, 1}} = apply(meta(2), make_purge(), [], State1),
{State3, _} = enq(3, 2, second, State2),
% get returns a reply value
{_State4, [{monitor, _, _}], {dequeue, {0, {_, second}}}} =
- apply(meta(4), {checkout, {dequeue, unsettled}, Cid}, [], State3),
+ apply(meta(4), make_checkout(Cid, {dequeue, unsettled}, #{}), [], State3),
ok.
purge_with_checkout_test() ->
@@ -1711,7 +1801,7 @@ purge_with_checkout_test() ->
{State0, _} = check_auto(Cid, 1, test_init(?FUNCTION_NAME)),
{State1, _} = enq(2, 1, first, State0),
{State2, _} = enq(3, 2, second, State1),
- {State3, _, {purge, 2}} = apply(meta(2), purge, [], State2),
+ {State3, _, {purge, 2}} = apply(meta(2), make_purge(), [], State2),
#consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers),
?assertEqual(0, maps:size(Checked)),
ok.
@@ -1721,36 +1811,47 @@ meta(Idx) ->
enq(Idx, MsgSeq, Msg, State) ->
strip_reply(
- apply(meta(Idx), {enqueue, self(), MsgSeq, Msg}, [], State)).
+ apply(meta(Idx), make_enqueue(self(), MsgSeq, Msg), [], State)).
deq(Idx, Cid, Settlement, State0) ->
{State, _, {dequeue, Msg}} =
- apply(meta(Idx), {checkout, {dequeue, Settlement}, Cid}, [], State0),
+ apply(meta(Idx),
+ make_checkout(Cid, {dequeue, Settlement}, #{}),
+ [], State0),
{State, Msg}.
check_n(Cid, Idx, N, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {auto, N, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {auto, N, simple_prefetch}, #{}),
+ [], State)).
check(Cid, Idx, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {once, 1, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {once, 1, simple_prefetch}, #{}),
+ [], State)).
check_auto(Cid, Idx, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {auto, 1, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {auto, 1, simple_prefetch}, #{}),
+ [], State)).
check(Cid, Idx, Num, State) ->
- strip_reply(apply(meta(Idx),
- {checkout, {once, Num, simple_prefetch}, Cid}, [], State)).
+ strip_reply(
+ apply(meta(Idx),
+ make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
+ [], State)).
settle(Cid, Idx, MsgId, State) ->
- strip_reply(apply(meta(Idx), {settle, [MsgId], Cid}, [], State)).
+ strip_reply(apply(meta(Idx), make_settle(Cid, [MsgId]), [], State)).
credit(Cid, Idx, Credit, DelCnt, Drain, State) ->
- strip_reply(apply(meta(Idx), {credit, Credit, DelCnt, Drain, Cid}, [], State)).
+ strip_reply(apply(meta(Idx), make_credit(Cid, Credit, DelCnt, Drain),
+ [], State)).
-strip_reply({State, Effects, _Replu}) ->
+strip_reply({State, Effects, _Reply}) ->
{State, Effects}.
run_log(InitState, Entries) ->
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 635d85be4a..9cdb1dfbe7 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -24,8 +24,8 @@
init/2,
init/3,
init/5,
- checkout/3,
checkout/4,
+ checkout/5,
cancel_checkout/2,
enqueue/2,
enqueue/3,
@@ -42,6 +42,7 @@
]).
-include_lib("ra/include/ra.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-define(SOFT_LIMIT, 256).
@@ -51,7 +52,7 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
--record(consumer, {last_msg_id :: seq(),
+-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
-record(state, {cluster_name :: ra_cluster_name(),
@@ -98,7 +99,7 @@ init(ClusterName, Servers) ->
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
-init(ClusterName, Servers, SoftLimit) ->
+init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
servers = Servers,
@@ -107,7 +108,7 @@ init(ClusterName, Servers, SoftLimit) ->
-spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
-init(ClusterName, Servers, SoftLimit, BlockFun, UnblockFun) ->
+init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
servers = Servers,
@@ -135,7 +136,7 @@ enqueue(Correlation, Msg, State0 = #state{slow = Slow,
Node = pick_node(State0),
{Next, State1} = next_enqueue_seq(State0),
% by default there is no correlation id
- Cmd = {enqueue, self(), Next, Msg},
+ Cmd = rabbit_fifo:make_enqueue(self(), Next, Msg),
case send_command(Node, Correlation, Cmd, low, State1) of
{slow, _} = Ret when not Slow ->
BlockFun(),
@@ -177,8 +178,11 @@ enqueue(Msg, State) ->
dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
Node = pick_node(State0),
ConsumerId = consumer_id(ConsumerTag),
- case ra:process_command(Node, {checkout, {dequeue, Settlement},
- ConsumerId}, Timeout) of
+ case ra:process_command(Node,
+ rabbit_fifo:make_checkout(ConsumerId,
+ {dequeue, Settlement},
+ #{}),
+ Timeout) of
{ok, {dequeue, Reply}, Leader} ->
{ok, Reply, State0#state{leader = Leader}};
Err ->
@@ -198,7 +202,7 @@ dequeue(ConsumerTag, Settlement, #state{timeout = Timeout} = State0) ->
{ok, state()}.
settle(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
- Cmd = {settle, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_settle(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -232,7 +236,7 @@ settle(ConsumerTag, [_|_] = MsgIds,
return(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
% TODO: make rabbit_fifo return support lists of message ids
- Cmd = {return, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_return(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -265,7 +269,7 @@ return(ConsumerTag, [_|_] = MsgIds,
{ok | slow, state()}.
discard(ConsumerTag, [_|_] = MsgIds, #state{slow = false} = State0) ->
Node = pick_node(State0),
- Cmd = {discard, MsgIds, consumer_id(ConsumerTag)},
+ Cmd = rabbit_fifo:make_discard(consumer_id(ConsumerTag), MsgIds),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -299,9 +303,10 @@ discard(ConsumerTag, [_|_] = MsgIds,
%%
%% @returns `{ok, State}' or `{error | timeout, term()}'
-spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
+ rabbit_fifo:consumer_meta(),
state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, State0) ->
- checkout(ConsumerTag, NumUnsettled, simple_prefetch, State0).
+checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) ->
+ checkout(ConsumerTag, NumUnsettled, simple_prefetch, ConsumerInfo, State0).
%% @doc Register with the rabbit_fifo queue to "checkout" messages as they
%% become available.
@@ -319,13 +324,17 @@ checkout(ConsumerTag, NumUnsettled, State0) ->
%% @param State The {@module} state.
%%
%% @returns `{ok, State}' or `{error | timeout, term()}'
--spec checkout(rabbit_fifo:consumer_tag(), NumUnsettled :: non_neg_integer(),
+-spec checkout(rabbit_fifo:consumer_tag(),
+ NumUnsettled :: non_neg_integer(),
CreditMode :: rabbit_fifo:credit_mode(),
+ Meta :: rabbit_fifo:consumer_meta(),
state()) -> {ok, state()} | {error | timeout, term()}.
-checkout(ConsumerTag, NumUnsettled, CreditMode, State0) ->
+checkout(ConsumerTag, NumUnsettled, CreditMode, Meta, State0) ->
Servers = sorted_servers(State0),
ConsumerId = {ConsumerTag, self()},
- Cmd = {checkout, {auto, NumUnsettled, CreditMode}, ConsumerId},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId,
+ {auto, NumUnsettled, CreditMode},
+ Meta),
try_process_command(Servers, Cmd, State0).
%% @doc Provide credit to the queue
@@ -348,8 +357,8 @@ credit(ConsumerTag, Credit, Drain,
%% add one as it is 0 indexed
C = maps:get(ConsumerTag, CDels, #consumer{last_msg_id = -1}),
Node = pick_node(State0),
- Cmd = {credit, Credit, C#consumer.last_msg_id + 1, Drain, ConsumerId},
- ct:pal("sending credit ~w", [Cmd]),
+ Cmd = rabbit_fifo:make_credit(ConsumerId, Credit,
+ C#consumer.last_msg_id + 1, Drain),
case send_command(Node, undefined, Cmd, normal, State0) of
{slow, S} ->
% turn slow into ok for this function
@@ -372,7 +381,7 @@ credit(ConsumerTag, Credit, Drain,
cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
Servers = sorted_servers(State0),
ConsumerId = {ConsumerTag, self()},
- Cmd = {checkout, cancel, ConsumerId},
+ Cmd = rabbit_fifo:make_checkout(ConsumerId, cancel, #{}),
State = State0#state{consumer_deliveries = maps:remove(ConsumerTag, CDels)},
try_process_command(Servers, Cmd, State).
@@ -380,7 +389,7 @@ cancel_checkout(ConsumerTag, #state{consumer_deliveries = CDels} = State0) ->
%% of messages purged.
-spec purge(ra_server_id()) -> {ok, non_neg_integer()} | {error | timeout, term()}.
purge(Node) ->
- case ra:process_command(Node, purge) of
+ case ra:process_command(Node, rabbit_fifo:make_purge()) of
{ok, {purge, Reply}, _} ->
{ok, Reply};
Err ->
@@ -393,7 +402,7 @@ cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
- case ra:process_command(Node, {update_state, Conf}) of
+ case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
{ok, ok, _} ->
ok;
Err ->
@@ -520,7 +529,7 @@ handle_ra_event(_Leader, {machine, eol}, _State0) ->
-spec untracked_enqueue([ra_server_id()], term()) ->
ok.
untracked_enqueue([Node | _], Msg) ->
- Cmd = {enqueue, undefined, undefined, Msg},
+ Cmd = rabbit_fifo:make_enqueue(undefined, undefined, Msg),
ok = ra:pipeline_command(Node, Cmd),
ok.
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 47b8a6f643..bb8af13b9d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -19,7 +19,7 @@
-export([init_state/2, handle_event/2]).
-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]).
-export([info/1, info/2, stat/1, infos/1]).
--export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]).
+-export([ack/3, reject/4, basic_get/4, basic_consume/10, basic_cancel/4]).
-export([credit/4]).
-export([purge/1]).
-export([stateless_deliver/2, deliver/3]).
@@ -47,12 +47,9 @@
-spec handle_event({'ra_event', ra_server_id(), any()}, rabbit_fifo_client:state()) ->
{'internal', Correlators :: [term()], rabbit_fifo_client:state()} |
{rabbit_fifo:client_msg(), rabbit_fifo_client:state()}.
--spec declare(rabbit_types:amqqueue()) -> {'new', rabbit_types:amqqueue(), rabbit_fifo_client:state()}.
-spec recover([rabbit_types:amqqueue()]) -> [rabbit_types:amqqueue() |
{'absent', rabbit_types:amqqueue(), atom()}].
-spec stop(rabbit_types:vhost()) -> 'ok'.
--spec delete(rabbit_types:amqqueue(), boolean(), boolean(), rabbit_types:username()) ->
- {'ok', QLen :: non_neg_integer()}.
-spec ack(rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
@@ -61,15 +58,9 @@
rabbit_fifo_client:state()) ->
{'ok', 'empty', rabbit_fifo_client:state()} |
{'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
--spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
- ConsumerPrefetchCount :: non_neg_integer(), rabbit_types:ctag(),
- ExclusiveConsume :: boolean(), Args :: rabbit_framing:amqp_table(),
- any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
--spec deliver(Confirm :: boolean(), rabbit_types:delivery(), rabbit_fifo_client:state()) ->
- rabbit_fifo_client:state().
-spec info(rabbit_types:amqqueue()) -> rabbit_types:infos().
-spec info(rabbit_types:amqqueue(), rabbit_types:info_keys()) -> rabbit_types:infos().
-spec infos(rabbit_types:r('queue')) -> rabbit_types:infos().
@@ -95,7 +86,7 @@
-spec init_state(ra_server_id(), rabbit_types:r('queue')) ->
rabbit_fifo_client:state().
-init_state({Name, _}, QName) ->
+init_state({Name, _}, QName = #resource{}) ->
{ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit),
%% This lookup could potentially return an {error, not_found}, but we do not
%% know what to do if the queue has `disappeared`. Let it crash.
@@ -104,13 +95,16 @@ init_state({Name, _}, QName) ->
%% Ensure the leader is listed first
Servers0 = [{Name, N} || N <- Nodes],
Servers = [Leader | lists:delete(Leader, Servers0)],
- rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit,
+ rabbit_fifo_client:init(QName, Servers, SoftLimit,
fun() -> credit_flow:block(Name), ok end,
fun() -> credit_flow:unblock(Name), ok end).
handle_event({ra_event, From, Evt}, QState) ->
rabbit_fifo_client:handle_ra_event(From, Evt, QState).
+-spec declare(rabbit_types:amqqueue()) ->
+ {'new', rabbit_types:amqqueue()} |
+ {existing, rabbit_types:amqqueue()}.
declare(#amqqueue{name = QName,
durable = Durable,
auto_delete = AutoDelete,
@@ -244,9 +238,9 @@ recover(Queues) ->
RaNodes = [{Name, Node} || Node <- Nodes],
case ra:start_server(Name, {Name, node()}, Machine, RaNodes) of
ok -> ok;
- Err ->
+ Err2 ->
rabbit_log:warning("recover: quorum queue ~w could not"
- " be started ~w", [Name, Err]),
+ " be started ~w", [Name, Err2]),
ok
end;
{error, {already_started, _}} ->
@@ -272,7 +266,12 @@ stop(VHost) ->
_ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
ok.
-delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes},
+-spec delete(rabbit_types:amqqueue(),
+ boolean(), boolean(),
+ rabbit_types:username()) ->
+ {ok, QLen :: non_neg_integer()}.
+delete(#amqqueue{type = quorum, pid = {Name, _},
+ name = QName, quorum_nodes = QNodes},
_IfUnused, _IfEmpty, ActingUser) ->
%% TODO Quorum queue needs to support consumer tracking for IfUnused
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -342,17 +341,35 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
{error, timeout}
end.
+-spec basic_consume(rabbit_types:amqqueue(), NoAck :: boolean(), ChPid :: pid(),
+ ConsumerPrefetchCount :: non_neg_integer(),
+ rabbit_types:ctag(), ExclusiveConsume :: boolean(),
+ Args :: rabbit_framing:amqp_table(), ActingUser :: binary(),
+ any(), rabbit_fifo_client:state()) ->
+ {'ok', rabbit_fifo_client:state()}.
basic_consume(#amqqueue{name = QName, type = quorum}, NoAck, ChPid,
- ConsumerPrefetchCount, ConsumerTag, ExclusiveConsume, Args, OkMsg,
- QState0) ->
+ ConsumerPrefetchCount, ConsumerTag0, ExclusiveConsume, Args,
+ ActingUser, OkMsg, QState0) ->
+ %% TODO: validate consumer arguments
+ %% currently quorum queues do not support any arguments
maybe_send_reply(ChPid, OkMsg),
- %% A prefetch count of 0 means no limitation, let's make it into something large for ra
+ ConsumerTag = quorum_ctag(ConsumerTag0),
+ %% A prefetch count of 0 means no limitation,
+ %% let's make it into something large for ra
Prefetch = case ConsumerPrefetchCount of
0 -> 2000;
Other -> Other
end,
- {ok, QState} = rabbit_fifo_client:checkout(quorum_ctag(ConsumerTag),
- Prefetch, QState0),
+ %% consumer info is used to describe the consumer properties
+ ConsumerMeta = #{ack => not NoAck,
+ prefetch => ConsumerPrefetchCount,
+ args => Args,
+ username => ActingUser},
+ {ok, QState} = rabbit_fifo_client:checkout(ConsumerTag,
+ Prefetch,
+ ConsumerMeta,
+ QState0),
+ %% TODO: emit as rabbit_fifo effect
rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck, QName,
ConsumerPrefetchCount, Args),
@@ -366,6 +383,9 @@ stateless_deliver(ServerId, Delivery) ->
ok = rabbit_fifo_client:untracked_enqueue([ServerId],
Delivery#delivery.message).
+-spec deliver(Confirm :: boolean(), rabbit_types:delivery(),
+ rabbit_fifo_client:state()) ->
+ {ok | slow, rabbit_fifo_client:state()}.
deliver(false, Delivery, QState0) ->
rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0);
deliver(true, Delivery, QState0) ->
@@ -401,8 +421,8 @@ cleanup_data_dir() ->
<- rabbit_amqqueue:list_by_type(quorum),
lists:member(node(), Nodes)],
Registered = ra_directory:list_registered(),
- [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
- not lists:member(Name, Names)],
+ _ = [maybe_delete_data_dir(UId) || {Name, UId} <- Registered,
+ not lists:member(Name, Names)],
ok.
maybe_delete_data_dir(UId) ->