diff options
| author | kjnilsson <knilsson@pivotal.io> | 2019-01-17 10:28:37 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2019-01-17 17:57:10 +0000 |
| commit | 118e74b97527887d72b00d4200e3a16c63ef8b99 (patch) | |
| tree | 872e2ecb30b11666e2dec4dd1f9f78fd4b7ab7ec | |
| parent | f7a264bfd03a4248503ef9f1dae6b800d913624e (diff) | |
| download | rabbitmq-server-git-118e74b97527887d72b00d4200e3a16c63ef8b99.tar.gz | |
more wip
| -rw-r--r-- | src/rabbit_channel.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 53 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 133 |
3 files changed, 158 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index eaee785aea..37198da5cf 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1341,6 +1341,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin, "amq.ctag"); Other -> Other end, + %% call queue type abstraction here?? case basic_consume( QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ExclusiveConsume, Args, NoWait, State) of @@ -1949,16 +1950,19 @@ notify_queues(State = #ch{consumer_mapping = Consumers, {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), State#ch{state = closing}}. +%% this function takes the list of unacked records and aggregates msg ids +%% per queue ref, then it folds over the result set with the provided folder +%% function foreach_per_queue(_F, [], Acc) -> Acc; foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) -> - %% quorum queue, needs the consumer tag F({QPid, CTag}, [MsgId], Acc); foreach_per_queue(F, UAL, Acc) -> T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) -> rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T) end, gb_trees:empty(), UAL), - rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). + rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, + Acc, T). consumer_queue_refs(Consumers) -> lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl new file mode 100644 index 0000000000..797cf035fa --- /dev/null +++ b/src/rabbit_classic_queue.erl @@ -0,0 +1,53 @@ +-module(rabbit_classic_queue). + +-export([ + init/1, + begin_receive/4 + ]). + +-record(?MODULE, {pid :: pid()}). + +-opaque state() :: #?MODULE{}. + +-export_type([ + state/0 + ]). + +init(QDef) -> + {#?MODULE{}, []}. + +begin_receive(_QId, State, ConsumerTag, + #{acting_user := ActingUser} = Meta) -> + Args = maps:get(consumer_args, Meta, []), + %% TODO: validate consume arguments + {Prefetch, NoAck} = + case Meta of + #{credit := {simple_prefetch, P}} -> + {P, false}; + #{credit := none} -> + {0, true} + end, + + ExclusiveConsume = false, + QPid = State#?MODULE.pid, + ChPid = self(), + LimiterPid = undefined, + LimiterActive = false, + OkMsg = undefined, + case delegate:invoke(QPid, + {gen_server2, call, + [{basic_consume, NoAck, ChPid, LimiterPid, + LimiterActive, + Prefetch, ConsumerTag, + ExclusiveConsume, + Args, OkMsg, ActingUser}, infinity]}) of + ok -> + {ok, State, []}; + Err -> + Err + end. + + +-ifdef(TEST). +-include_lib("eunit/include/eunit.hrl"). +-endif. diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index 7c50e1fc7e..11fe18906e 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -7,6 +7,7 @@ in/4, handle_queue_info/3, handle_down/3, + begin_receive/4, to_map/1 ]). @@ -37,8 +38,6 @@ -opaque state() :: #?MODULE{}. -% -type msg() :: term(). - %% the session state maintained by each channel for each queue %% each queue type will have it's own state -type queue_state() :: term(). @@ -87,11 +86,16 @@ -type deliveries() :: [{out_tag(), term()}]. %% placeholder to represent deliviers received from a queue --type credit_def() :: {simple_prefetch, non_neg_integer()} | credited. +-type credit_def() :: {simple_prefetch, non_neg_integer()} | + credited | + none. --type receive_args() :: #{credit := credit_def(), +-type receive_meta() :: #{credit := credit_def(), + acting_user := binary(), atom() => term}. +-type receive_tag() :: binary(). + -type queue_def() :: term(). %% place holder for some means of configuring the queue - possibly %% an amqqueue record @@ -132,7 +136,7 @@ %% E.g. a "stream" queue could take an optional position argument to %% specificy where in the log to begin streaming from. -callback begin_receive(queue_id(), queue_state(), - Tag :: term(), Args :: receive_args()) -> + Tag :: term(), Args :: receive_meta()) -> {queue_state(), actions()}. %% end a receive using the Tag @@ -141,6 +145,13 @@ %% updates the message state for received messages %% indicate if they are considered settled by the receiver +%% TODO: need to support a variety of ways to update the message stae +%% AMQP 0.9.1: single tag or multiple (up to) +%% AMQP 1.0: range {first, last} +%% MQTT 3.1.1: appears to only PUBACK one at a time +%% STOMP: 1.2 also acks one at a time +%% KAFKA: Polls in batches, range or sequence would probably do +%% CoAP: may randomise sequence number - seqs may not be sequential -callback update_msg_state(queue_id(), queue_state(), OutTags :: [out_tag()], @@ -184,15 +195,26 @@ in(Destinations, SeqNum, Delivery, end end, {[], State0}, Destinations), + %% dispatch to each queue type implementation + {Queues, Actions} = lists:foldl( + fun(Qid, {Qus, As}) -> + #queue{module = Mod} = Q = maps:get(Qid, Qus), + {Q2, A} = Mod:in(Qid, Q, SeqNum, Delivery), + {Qus#{Qid => Q2}, A ++ As} + end, {State#?MODULE.queues, []}, QIds), - %% * if the queue is new perform queue detail lookup and initialise - %% by calling the impl init/1 function - %% * stash incoming message in pending_in - %% * foreach queue pass to `in' callback and aggregate actions %% TODO: how to aggregate network calls for classic queues (delegate) %% TODO: also credit flow??? - Pend = dtree:insert(SeqNum, QIds, Delivery, Pend0), - {State#?MODULE{pending_in = Pend}, []}. + Pend = record_pending(SeqNum, QIds, Delivery, Pend0), + {State#?MODULE{pending_in = Pend, + queues = Queues}, Actions}. + +record_pending(undefined, _QIds, _Delivery, Pend) -> + Pend; +record_pending(Seq, QIds, Delivery, Pend) + when is_integer(Seq) -> + dtree:insert(Seq, QIds, Delivery, Pend). + @@ -216,13 +238,14 @@ handle_queue_info(QueueId, Info, #?MODULE{queues = Queues} = State) -> %% process any `settle' actions %% handle actions {Actions, Pend} = - lists:foldl(fun({msg_state_update, accepted, Seqs} = Evt, {Acc, P}) -> + lists:foldl(fun({msg_state_update, DeliveryState, Seqs}, + {Acc, P}) -> case dtree:take(Seqs, QueueId, P) of {[], P1} -> {Acc, P1}; {Completed, P1} -> CompletedSeqs = [S || {S, _} <- Completed], - Evt = {msg_state_update, accepted, + Evt = {msg_state_update, DeliveryState, CompletedSeqs}, {[Evt | Acc], P1} end @@ -230,6 +253,14 @@ handle_queue_info(QueueId, Info, #?MODULE{queues = Queues} = State) -> {State#?MODULE{queues = Queues#{QueueId => Q#queue{state = Qs}}, pending_in = Pend}, lists:reverse(Actions)}. + +-spec begin_receive(queue_name(), state(), + receive_tag(), receive_meta()) -> + {ok, state(), actions()} | {error, duplicate}. +begin_receive(_QName, State, _Tag, _Args) -> + % Module:handle_down( + {ok, State, []}. + to_map(State) -> #{monitors => State#?MODULE.monitors, pending_in => State#?MODULE.pending_in, @@ -246,20 +277,25 @@ to_map(State) -> -ifdef(TEST). -include_lib("eunit/include/eunit.hrl"). -in_msg_is_accepted_test() -> - QName = make_queue_name(?FUNCTION_NAME), - meck:new(?FUNCTION_NAME, [non_strict]), - meck:expect(?FUNCTION_NAME, init, fun (Q) -> {Q, []} end), - meck:expect(?FUNCTION_NAME, in, +setup_meck_impl(Mod) -> + meck:new(Mod, [non_strict]), + meck:expect(Mod, init, fun (Q) -> {Q, []} end), + meck:expect(Mod, in, fun (_QId, Qs, _Seq, _Msg) -> {Qs, []} end), - meck:expect(?FUNCTION_NAME, handle_queue_info, + meck:expect(Mod, handle_queue_info, fun (_QId, {applied, Seqs}, Qs) when is_list(Seqs) -> %% simply record as accepted - {Qs, [{msg_state_update, accepted, Seqs}]} + {Qs, [{msg_state_update, accepted, Seqs}]}; + (_QId, {rejected, Seqs}, Qs) when is_list(Seqs) -> + {Qs, [{msg_state_update, rejected, Seqs}]} end), + ok. +in_msg_is_accepted_test() -> + QName = make_queue_name(?FUNCTION_NAME), + setup_meck_impl(?FUNCTION_NAME), LookupFun = fun(Name) -> #{module => ?FUNCTION_NAME, name => Name} @@ -277,27 +313,32 @@ in_msg_is_accepted_test() -> {Qs2, [{msg_state_update, accepted, [1]}]} = rabbit_queue_type:handle_queue_info(QId, {applied, [1]}, Qs1), + {Qs2, [{msg_state_update, rejected, [1]}]} = + rabbit_queue_type:handle_queue_info(QId, {rejected, [1]}, Qs1), %% no pending should remain inside the state after the queue has accepted %% the message ?assertEqual(0, dtree:size(Qs2#?MODULE.pending_in)), - % CustomerTag = UId, + ok. + +untracked_in_test() -> + QName = make_queue_name(?FUNCTION_NAME), + setup_meck_impl(?FUNCTION_NAME), + LookupFun = fun(Name) -> + #{module => ?FUNCTION_NAME, + name => Name} + end, + Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}), + + {Qs1, []} = rabbit_queue_type:in([QName], undefined, some_delivery, Qs0), + #?MODULE{queues = Queues, pending_in = P1} = Qs1, + [_] = maps:keys(Queues), + ?assertEqual(0, dtree:size(P1)), ok. in_msg_multi_queue_is_accepted_test() -> QName = make_queue_name(?FUNCTION_NAME), QAlt = make_queue_name(alt_queue_name), - meck:new(?FUNCTION_NAME, [non_strict]), - meck:expect(?FUNCTION_NAME, init, fun (Q) -> {Q, []} end), - meck:expect(?FUNCTION_NAME, in, - fun (_QId, Qs, _Seq, _Msg) -> - {Qs, []} - end), - meck:expect(?FUNCTION_NAME, handle_queue_info, - fun (_QId, {applied, Seqs}, Qs) when is_list(Seqs) -> - %% simply record as accepted - {Qs, [{msg_state_update, accepted, Seqs}]} - end), - + setup_meck_impl(?FUNCTION_NAME), LookupFun = fun(Name) -> #{module => ?FUNCTION_NAME, name => Name} @@ -305,7 +346,7 @@ in_msg_multi_queue_is_accepted_test() -> Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}), {#?MODULE{queue_names = #{QName := QId1, - QAlt := QId2}} =Qs1, + QAlt := QId2}} = Qs1, []} = rabbit_queue_type:in([QName, QAlt], 1, some_delivery, Qs0), %% no msg_state_update should be issued for the first one @@ -313,6 +354,30 @@ in_msg_multi_queue_is_accepted_test() -> rabbit_queue_type:handle_queue_info(QId1, {applied, [1]}, Qs1), {_, [{msg_state_update, accepted, [1]}]} = rabbit_queue_type:handle_queue_info(QId2, {applied, [1]}, Qs2), + ?assert(meck:called(?FUNCTION_NAME, in, ['_', '_', '_', '_'])), + ?assertEqual(2, meck:num_calls(?FUNCTION_NAME, in, '_')), + meck:unload(), + ok. + +begin_end_receive_test() -> + QName = make_queue_name(?FUNCTION_NAME), + setup_meck_impl(?FUNCTION_NAME), + LookupFun = fun(Name) -> + #{module => ?FUNCTION_NAME, + name => Name} + end, + Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}), + Tag = <<"my-tag">>, + Args = #{credit => {simple_prefetch, 10}}, + {ok, Qs1, []} = rabbit_queue_type:begin_receive(QName, Qs0, Tag, Args), + %% Assert new queue state was set up and Mod:begin_receive was called + ?assert(meck:called(?FUNCTION_NAME, begin_receive, ['_', '_', '_', '_'])), + {error, duplicate} = rabbit_queue_type:begin_receive(QName, Qs0, Tag, Args), + {ok, _, []} = rabbit_queue_type:end_receive(QName, Qs1, Tag), + ?assert(meck:called(?FUNCTION_NAME, end_receive, ['_', '_', '_'])), + %% TODO:if there are no publishes and no other receiver tags does this + %% end the queue session? + %% TODO: handle calls to being_receive with tame tag (should error) ok. make_queue_name(Name) when is_atom(Name) -> |
