summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2019-01-17 10:28:37 +0000
committerkjnilsson <knilsson@pivotal.io>2019-01-17 17:57:10 +0000
commit118e74b97527887d72b00d4200e3a16c63ef8b99 (patch)
tree872e2ecb30b11666e2dec4dd1f9f78fd4b7ab7ec
parentf7a264bfd03a4248503ef9f1dae6b800d913624e (diff)
downloadrabbitmq-server-git-118e74b97527887d72b00d4200e3a16c63ef8b99.tar.gz
more wip
-rw-r--r--src/rabbit_channel.erl8
-rw-r--r--src/rabbit_classic_queue.erl53
-rw-r--r--src/rabbit_queue_type.erl133
3 files changed, 158 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index eaee785aea..37198da5cf 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1341,6 +1341,7 @@ handle_method(#'basic.consume'{queue = QueueNameBin,
"amq.ctag");
Other -> Other
end,
+ %% call queue type abstraction here??
case basic_consume(
QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ExclusiveConsume, Args, NoWait, State) of
@@ -1949,16 +1950,19 @@ notify_queues(State = #ch{consumer_mapping = Consumers,
{rabbit_amqqueue:notify_down_all(QPids, self(), Timeout),
State#ch{state = closing}}.
+%% this function takes the list of unacked records and aggregates msg ids
+%% per queue ref, then it folds over the result set with the provided folder
+%% function
foreach_per_queue(_F, [], Acc) ->
Acc;
foreach_per_queue(F, [{_DTag, CTag, {QPid, MsgId}}], Acc) ->
- %% quorum queue, needs the consumer tag
F({QPid, CTag}, [MsgId], Acc);
foreach_per_queue(F, UAL, Acc) ->
T = lists:foldl(fun ({_DTag, CTag, {QPid, MsgId}}, T) ->
rabbit_misc:gb_trees_cons({QPid, CTag}, MsgId, T)
end, gb_trees:empty(), UAL),
- rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T).
+ rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end,
+ Acc, T).
consumer_queue_refs(Consumers) ->
lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}}
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
new file mode 100644
index 0000000000..797cf035fa
--- /dev/null
+++ b/src/rabbit_classic_queue.erl
@@ -0,0 +1,53 @@
+-module(rabbit_classic_queue).
+
+-export([
+ init/1,
+ begin_receive/4
+ ]).
+
+-record(?MODULE, {pid :: pid()}).
+
+-opaque state() :: #?MODULE{}.
+
+-export_type([
+ state/0
+ ]).
+
+init(QDef) ->
+ {#?MODULE{}, []}.
+
+begin_receive(_QId, State, ConsumerTag,
+ #{acting_user := ActingUser} = Meta) ->
+ Args = maps:get(consumer_args, Meta, []),
+ %% TODO: validate consume arguments
+ {Prefetch, NoAck} =
+ case Meta of
+ #{credit := {simple_prefetch, P}} ->
+ {P, false};
+ #{credit := none} ->
+ {0, true}
+ end,
+
+ ExclusiveConsume = false,
+ QPid = State#?MODULE.pid,
+ ChPid = self(),
+ LimiterPid = undefined,
+ LimiterActive = false,
+ OkMsg = undefined,
+ case delegate:invoke(QPid,
+ {gen_server2, call,
+ [{basic_consume, NoAck, ChPid, LimiterPid,
+ LimiterActive,
+ Prefetch, ConsumerTag,
+ ExclusiveConsume,
+ Args, OkMsg, ActingUser}, infinity]}) of
+ ok ->
+ {ok, State, []};
+ Err ->
+ Err
+ end.
+
+
+-ifdef(TEST).
+-include_lib("eunit/include/eunit.hrl").
+-endif.
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index 7c50e1fc7e..11fe18906e 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -7,6 +7,7 @@
in/4,
handle_queue_info/3,
handle_down/3,
+ begin_receive/4,
to_map/1
]).
@@ -37,8 +38,6 @@
-opaque state() :: #?MODULE{}.
-% -type msg() :: term().
-
%% the session state maintained by each channel for each queue
%% each queue type will have it's own state
-type queue_state() :: term().
@@ -87,11 +86,16 @@
-type deliveries() :: [{out_tag(), term()}].
%% placeholder to represent deliviers received from a queue
--type credit_def() :: {simple_prefetch, non_neg_integer()} | credited.
+-type credit_def() :: {simple_prefetch, non_neg_integer()} |
+ credited |
+ none.
--type receive_args() :: #{credit := credit_def(),
+-type receive_meta() :: #{credit := credit_def(),
+ acting_user := binary(),
atom() => term}.
+-type receive_tag() :: binary().
+
-type queue_def() :: term().
%% place holder for some means of configuring the queue - possibly
%% an amqqueue record
@@ -132,7 +136,7 @@
%% E.g. a "stream" queue could take an optional position argument to
%% specificy where in the log to begin streaming from.
-callback begin_receive(queue_id(), queue_state(),
- Tag :: term(), Args :: receive_args()) ->
+ Tag :: term(), Args :: receive_meta()) ->
{queue_state(), actions()}.
%% end a receive using the Tag
@@ -141,6 +145,13 @@
%% updates the message state for received messages
%% indicate if they are considered settled by the receiver
+%% TODO: need to support a variety of ways to update the message stae
+%% AMQP 0.9.1: single tag or multiple (up to)
+%% AMQP 1.0: range {first, last}
+%% MQTT 3.1.1: appears to only PUBACK one at a time
+%% STOMP: 1.2 also acks one at a time
+%% KAFKA: Polls in batches, range or sequence would probably do
+%% CoAP: may randomise sequence number - seqs may not be sequential
-callback update_msg_state(queue_id(),
queue_state(),
OutTags :: [out_tag()],
@@ -184,15 +195,26 @@ in(Destinations, SeqNum, Delivery,
end
end, {[], State0}, Destinations),
+ %% dispatch to each queue type implementation
+ {Queues, Actions} = lists:foldl(
+ fun(Qid, {Qus, As}) ->
+ #queue{module = Mod} = Q = maps:get(Qid, Qus),
+ {Q2, A} = Mod:in(Qid, Q, SeqNum, Delivery),
+ {Qus#{Qid => Q2}, A ++ As}
+ end, {State#?MODULE.queues, []}, QIds),
- %% * if the queue is new perform queue detail lookup and initialise
- %% by calling the impl init/1 function
- %% * stash incoming message in pending_in
- %% * foreach queue pass to `in' callback and aggregate actions
%% TODO: how to aggregate network calls for classic queues (delegate)
%% TODO: also credit flow???
- Pend = dtree:insert(SeqNum, QIds, Delivery, Pend0),
- {State#?MODULE{pending_in = Pend}, []}.
+ Pend = record_pending(SeqNum, QIds, Delivery, Pend0),
+ {State#?MODULE{pending_in = Pend,
+ queues = Queues}, Actions}.
+
+record_pending(undefined, _QIds, _Delivery, Pend) ->
+ Pend;
+record_pending(Seq, QIds, Delivery, Pend)
+ when is_integer(Seq) ->
+ dtree:insert(Seq, QIds, Delivery, Pend).
+
@@ -216,13 +238,14 @@ handle_queue_info(QueueId, Info, #?MODULE{queues = Queues} = State) ->
%% process any `settle' actions
%% handle actions
{Actions, Pend} =
- lists:foldl(fun({msg_state_update, accepted, Seqs} = Evt, {Acc, P}) ->
+ lists:foldl(fun({msg_state_update, DeliveryState, Seqs},
+ {Acc, P}) ->
case dtree:take(Seqs, QueueId, P) of
{[], P1} ->
{Acc, P1};
{Completed, P1} ->
CompletedSeqs = [S || {S, _} <- Completed],
- Evt = {msg_state_update, accepted,
+ Evt = {msg_state_update, DeliveryState,
CompletedSeqs},
{[Evt | Acc], P1}
end
@@ -230,6 +253,14 @@ handle_queue_info(QueueId, Info, #?MODULE{queues = Queues} = State) ->
{State#?MODULE{queues = Queues#{QueueId => Q#queue{state = Qs}},
pending_in = Pend}, lists:reverse(Actions)}.
+
+-spec begin_receive(queue_name(), state(),
+ receive_tag(), receive_meta()) ->
+ {ok, state(), actions()} | {error, duplicate}.
+begin_receive(_QName, State, _Tag, _Args) ->
+ % Module:handle_down(
+ {ok, State, []}.
+
to_map(State) ->
#{monitors => State#?MODULE.monitors,
pending_in => State#?MODULE.pending_in,
@@ -246,20 +277,25 @@ to_map(State) ->
-ifdef(TEST).
-include_lib("eunit/include/eunit.hrl").
-in_msg_is_accepted_test() ->
- QName = make_queue_name(?FUNCTION_NAME),
- meck:new(?FUNCTION_NAME, [non_strict]),
- meck:expect(?FUNCTION_NAME, init, fun (Q) -> {Q, []} end),
- meck:expect(?FUNCTION_NAME, in,
+setup_meck_impl(Mod) ->
+ meck:new(Mod, [non_strict]),
+ meck:expect(Mod, init, fun (Q) -> {Q, []} end),
+ meck:expect(Mod, in,
fun (_QId, Qs, _Seq, _Msg) ->
{Qs, []}
end),
- meck:expect(?FUNCTION_NAME, handle_queue_info,
+ meck:expect(Mod, handle_queue_info,
fun (_QId, {applied, Seqs}, Qs) when is_list(Seqs) ->
%% simply record as accepted
- {Qs, [{msg_state_update, accepted, Seqs}]}
+ {Qs, [{msg_state_update, accepted, Seqs}]};
+ (_QId, {rejected, Seqs}, Qs) when is_list(Seqs) ->
+ {Qs, [{msg_state_update, rejected, Seqs}]}
end),
+ ok.
+in_msg_is_accepted_test() ->
+ QName = make_queue_name(?FUNCTION_NAME),
+ setup_meck_impl(?FUNCTION_NAME),
LookupFun = fun(Name) ->
#{module => ?FUNCTION_NAME,
name => Name}
@@ -277,27 +313,32 @@ in_msg_is_accepted_test() ->
{Qs2, [{msg_state_update, accepted, [1]}]} =
rabbit_queue_type:handle_queue_info(QId, {applied, [1]}, Qs1),
+ {Qs2, [{msg_state_update, rejected, [1]}]} =
+ rabbit_queue_type:handle_queue_info(QId, {rejected, [1]}, Qs1),
%% no pending should remain inside the state after the queue has accepted
%% the message
?assertEqual(0, dtree:size(Qs2#?MODULE.pending_in)),
- % CustomerTag = UId,
+ ok.
+
+untracked_in_test() ->
+ QName = make_queue_name(?FUNCTION_NAME),
+ setup_meck_impl(?FUNCTION_NAME),
+ LookupFun = fun(Name) ->
+ #{module => ?FUNCTION_NAME,
+ name => Name}
+ end,
+ Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}),
+
+ {Qs1, []} = rabbit_queue_type:in([QName], undefined, some_delivery, Qs0),
+ #?MODULE{queues = Queues, pending_in = P1} = Qs1,
+ [_] = maps:keys(Queues),
+ ?assertEqual(0, dtree:size(P1)),
ok.
in_msg_multi_queue_is_accepted_test() ->
QName = make_queue_name(?FUNCTION_NAME),
QAlt = make_queue_name(alt_queue_name),
- meck:new(?FUNCTION_NAME, [non_strict]),
- meck:expect(?FUNCTION_NAME, init, fun (Q) -> {Q, []} end),
- meck:expect(?FUNCTION_NAME, in,
- fun (_QId, Qs, _Seq, _Msg) ->
- {Qs, []}
- end),
- meck:expect(?FUNCTION_NAME, handle_queue_info,
- fun (_QId, {applied, Seqs}, Qs) when is_list(Seqs) ->
- %% simply record as accepted
- {Qs, [{msg_state_update, accepted, Seqs}]}
- end),
-
+ setup_meck_impl(?FUNCTION_NAME),
LookupFun = fun(Name) ->
#{module => ?FUNCTION_NAME,
name => Name}
@@ -305,7 +346,7 @@ in_msg_multi_queue_is_accepted_test() ->
Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}),
{#?MODULE{queue_names = #{QName := QId1,
- QAlt := QId2}} =Qs1,
+ QAlt := QId2}} = Qs1,
[]} = rabbit_queue_type:in([QName, QAlt], 1, some_delivery, Qs0),
%% no msg_state_update should be issued for the first one
@@ -313,6 +354,30 @@ in_msg_multi_queue_is_accepted_test() ->
rabbit_queue_type:handle_queue_info(QId1, {applied, [1]}, Qs1),
{_, [{msg_state_update, accepted, [1]}]} =
rabbit_queue_type:handle_queue_info(QId2, {applied, [1]}, Qs2),
+ ?assert(meck:called(?FUNCTION_NAME, in, ['_', '_', '_', '_'])),
+ ?assertEqual(2, meck:num_calls(?FUNCTION_NAME, in, '_')),
+ meck:unload(),
+ ok.
+
+begin_end_receive_test() ->
+ QName = make_queue_name(?FUNCTION_NAME),
+ setup_meck_impl(?FUNCTION_NAME),
+ LookupFun = fun(Name) ->
+ #{module => ?FUNCTION_NAME,
+ name => Name}
+ end,
+ Qs0 = rabbit_queue_type:init(#{queue_lookup_fun => LookupFun}),
+ Tag = <<"my-tag">>,
+ Args = #{credit => {simple_prefetch, 10}},
+ {ok, Qs1, []} = rabbit_queue_type:begin_receive(QName, Qs0, Tag, Args),
+ %% Assert new queue state was set up and Mod:begin_receive was called
+ ?assert(meck:called(?FUNCTION_NAME, begin_receive, ['_', '_', '_', '_'])),
+ {error, duplicate} = rabbit_queue_type:begin_receive(QName, Qs0, Tag, Args),
+ {ok, _, []} = rabbit_queue_type:end_receive(QName, Qs1, Tag),
+ ?assert(meck:called(?FUNCTION_NAME, end_receive, ['_', '_', '_'])),
+ %% TODO:if there are no publishes and no other receiver tags does this
+ %% end the queue session?
+ %% TODO: handle calls to being_receive with tame tag (should error)
ok.
make_queue_name(Name) when is_atom(Name) ->