diff options
| -rw-r--r-- | src/rabbit_backing_queue_qc.erl | 122 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 352 |
4 files changed, 340 insertions, 175 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl index 706f54c0ab..095202dd11 100644 --- a/src/rabbit_backing_queue_qc.erl +++ b/src/rabbit_backing_queue_qc.erl @@ -37,19 +37,21 @@ -export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]). -record(state, {bqstate, - len, %% int - messages, %% queue of {msg_props, basic_msg} - acks, %% dict of acktag => {msg_props, basic_msg} - confirms}). %% set of msgid + len, %% int + next_seq_id, %% int + messages, %% gb_trees of seqid => {msg_props, basic_msg} + acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}] + confirms}). %% set of msgid %% Initialise model initial_state() -> - #state{bqstate = qc_variable_queue_init(qc_test_queue()), - len = 0, - messages = queue:new(), - acks = orddict:new(), - confirms = gb_sets:new()}. + #state{bqstate = qc_variable_queue_init(qc_test_queue()), + len = 0, + next_seq_id = 0, + messages = gb_trees:empty(), + acks = [], + confirms = gb_sets:new()}. %% Property @@ -123,11 +125,11 @@ qc_fetch(#state{bqstate = BQ}) -> {call, ?BQMOD, fetch, [boolean(), BQ]}. qc_ack(#state{bqstate = BQ, acks = Acks}) -> - {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}. + {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}. qc_requeue(#state{bqstate = BQ, acks = Acks}) -> {call, ?BQMOD, requeue, - [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. + [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}. qc_set_ram_duration_target(#state{bqstate = BQ}) -> {call, ?BQMOD, set_ram_duration_target, @@ -155,10 +157,10 @@ qc_purge(#state{bqstate = BQ}) -> precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg}) when Fun =:= ack; Fun =:= requeue -> - orddict:size(Acks) > 0; + length(Acks) > 0; precondition(#state{messages = Messages}, - {call, ?BQMOD, publish_delivered, _Arg}) -> - queue:is_empty(Messages); + {call, ?BQMOD, publish_delivered, _Arg}) -> + gb_trees:is_empty(Messages); precondition(_S, {call, ?BQMOD, _Fun, _Arg}) -> true; precondition(_S, {call, ?MODULE, timeout, _Arg}) -> @@ -169,14 +171,18 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) -> %% Model updates next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> - #state{len = Len, messages = Messages, confirms = Confirms} = S, + #state{len = Len, + messages = Messages, + confirms = Confirms, + next_seq_id = NextSeq} = S, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, NeedsConfirm = {call, erlang, element, [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, S#state{bqstate = BQ, len = Len + 1, - messages = queue:in({MsgProps, Msg}, Messages), + next_seq_id = NextSeq + 1, + messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages), confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms @@ -184,17 +190,19 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) -> next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) -> #state{len = Len, messages = Messages} = S, - Messages1 = repeat(Messages, fun(Msgs) -> - queue:in({MsgProps, Msg}, Msgs) - end, Count), - S#state{bqstate = BQ, - len = Len + Count, - messages = Messages1}; + {S1, Msgs1} = repeat({S, Messages}, + fun ({#state{next_seq_id = NextSeq} = State, Msgs}) -> + {State #state { next_seq_id = NextSeq + 1}, + gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)} + end, Count), + S1#state{bqstate = BQ, + len = Len + Count, + messages = Msgs1}; next_state(S, Res, {call, ?BQMOD, publish_delivered, [AckReq, Msg, MsgProps, _Pid, _BQ]}) -> - #state{confirms = Confirms, acks = Acks} = S, + #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S, AckTag = {call, erlang, element, [1, Res]}, BQ1 = {call, erlang, element, [2, Res]}, MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]}, @@ -202,12 +210,13 @@ next_state(S, Res, {call, erlang, element, [?RECORD_INDEX(needs_confirming, message_properties), MsgProps]}, S#state{bqstate = BQ1, + next_seq_id = NextSeq + 1, confirms = case eval(NeedsConfirm) of true -> gb_sets:add(MsgId, Confirms); _ -> Confirms end, acks = case AckReq of - true -> orddict:append(AckTag, {MsgProps, Msg}, Acks); + true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks]; false -> Acks end }; @@ -218,34 +227,36 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) -> BQ1 = {call, erlang, element, [2, Res]}, AckTag = {call, erlang, element, [3, ResultInfo]}, S1 = S#state{bqstate = BQ1}, - case queue:out(Messages) of - {empty, _M2} -> - S1; - {{value, MsgProp_Msg}, M2} -> - S2 = S1#state{len = Len - 1, messages = M2}, - case AckReq of - true -> - S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)}; - false -> - S2 - end + case gb_trees:is_empty(Messages) of + true -> S1; + false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages), + S2 = S1#state{len = Len - 1, messages = M2}, + case AckReq of + true -> + S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]}; + false -> + S2 + end end; next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) -> #state{acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, S#state{bqstate = BQ1, - acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) -> - #state{len = Len, messages = Messages, acks = AcksState} = S, + #state{messages = Messages, acks = AcksState} = S, BQ1 = {call, erlang, element, [2, Res]}, - RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) || - Key <- AcksArg]), + Messages1 = lists:foldl(fun (AckTag, Msgs) -> + {SeqId, MsgPropsMsg} = + proplists:get_value(AckTag, AcksState), + gb_trees:insert(SeqId, MsgPropsMsg, Msgs) + end, Messages, AcksArg), S#state{bqstate = BQ1, - len = Len + length(RequeueMsgs), - messages = queue:join(Messages, queue:from_list(RequeueMsgs)), - acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)}; + len = gb_trees:size(Messages1), + messages = Messages1, + acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)}; next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) -> S#state{bqstate = BQ}; @@ -260,8 +271,8 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) -> next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) -> #state{messages = Messages} = S, - Messages1 = drop_messages(Messages), - S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1}; + Msgs1 = drop_messages(Messages), + S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1}; next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) -> S; @@ -271,7 +282,7 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) -> next_state(S, Res, {call, ?BQMOD, purge, _Args}) -> BQ1 = {call, erlang, element, [2, Res]}, - S#state{bqstate = BQ1, len = 0, messages = queue:new()}. + S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}. %% Postconditions @@ -279,9 +290,9 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> #state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S, case Res of {{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} -> - {_MsgProps, Msg} = queue:head(Messages), + {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages), MsgFetched =:= Msg andalso - not orddict:is_key(AckTag, Acks) andalso + not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms) andalso RemainingLen =:= Len - 1; {empty, _BQ} -> @@ -290,7 +301,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) -> postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) -> #state{acks = Acks, confirms = Confrms} = S, - not orddict:is_key(AckTag, Acks) andalso + not proplists:is_defined(AckTag, Acks) andalso not gb_sets:is_element(AckTag, Confrms); postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) -> @@ -359,7 +370,14 @@ qc_test_queue(Durable) -> pid = self()}. rand_choice([]) -> []; -rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)]. +rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))). + +rand_choice(_List, Selection, 0) -> + Selection; +rand_choice(List, Selection, N) -> + Picked = lists:nth(random:uniform(length(List)), List), + rand_choice(List -- [Picked], [Picked | Selection], + N - 1). dropfun(Props) -> Expiry = eval({call, erlang, element, @@ -367,10 +385,10 @@ dropfun(Props) -> Expiry =/= 1. drop_messages(Messages) -> - case queue:out(Messages) of - {empty, _} -> + case gb_trees:is_empty(Messages) of + true -> Messages; - {{value, MsgProps_Msg}, M2} -> + false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages), MsgProps = {call, erlang, element, [1, MsgProps_Msg]}, case dropfun(MsgProps) of true -> drop_messages(M2); diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d2f5527726..9eb77c326a 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -836,14 +836,10 @@ handle_method(#'basic.recover_async'{requeue = true}, OkFun = fun () -> ok end, ok = fold_per_queue( fun (QPid, MsgIds, ok) -> - %% The Qpid python test suite incorrectly assumes - %% that messages will be requeued in their original - %% order. To keep it happy we reverse the id list - %% since we are given them in reverse order. rabbit_misc:with_exit_handler( OkFun, fun () -> rabbit_amqqueue:requeue( - QPid, lists:reverse(MsgIds), self()) + QPid, MsgIds, self()) end) end, ok, UAMQ), ok = notify_limiter(Limiter, UAMQ), diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 44c1337678..010279302e 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -2314,9 +2314,42 @@ test_variable_queue() -> fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1, fun test_dropwhile/1, fun test_dropwhile_varying_ram_duration/1, - fun test_variable_queue_ack_limiting/1]], + fun test_variable_queue_ack_limiting/1, + fun test_variable_queue_requeue/1]], passed. +test_variable_queue_requeue(VQ0) -> + Interval = 50, + Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval, + Seq = lists:seq(1, Count), + VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0), + VQ2 = variable_queue_publish(false, Count, VQ1), + {VQ3, Acks} = lists:foldl( + fun (_N, {VQN, AckTags}) -> + {{#basic_message{}, false, AckTag, _}, VQM} = + rabbit_variable_queue:fetch(true, VQN), + {VQM, [AckTag | AckTags]} + end, {VQ2, []}, Seq), + Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 -> + [Ack | Acc]; + (_, Acc) -> + Acc + end, [], lists:zip(Acks, Seq)), + {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset, + fun(X) -> X end, VQ3), + VQ5 = lists:foldl(fun (AckTag, VQN) -> + {_MsgId, VQM} = rabbit_variable_queue:requeue( + [AckTag], fun(X) -> X end, VQN), + VQM + end, VQ4, Subset), + VQ6 = lists:foldl(fun (AckTag, VQa) -> + {{#basic_message{}, true, AckTag, _}, VQb} = + rabbit_variable_queue:fetch(true, VQa), + VQb + end, VQ5, lists:reverse(Acks)), + {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6), + VQ7. + test_variable_queue_ack_limiting(VQ0) -> %% start by sending in a bunch of messages Len = 1024, diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 78d26c510e..d4f51f8d4a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -158,19 +158,18 @@ %% to search through qi segments looking for messages that are yet to %% be acknowledged. %% -%% Pending acks are recorded in memory either as the tuple {SeqId, -%% MsgId, MsgProps} (tuple-form) or as the message itself (message- -%% form). Acks for persistent messages are always stored in the tuple- -%% form. Acks for transient messages are also stored in tuple-form if -%% the message has been sent to disk as part of the memory reduction -%% process. For transient messages that haven't already been written -%% to disk, acks are stored in message-form. +%% Pending acks are recorded in memory by storing the message itself. +%% If the message has been sent to disk, we do not store the message +%% content. During memory reduction, pending acks containing message +%% content have that content removed and the corresponding messages +%% are pushed out to disk. %% -%% During memory reduction, acks stored in message-form are converted -%% to tuple-form, and the corresponding messages are pushed out to -%% disk. +%% Messages from pending acks are returned to q4, q3 and delta during +%% requeue, based on the limits of seq_id contained in each. Requeued +%% messages retain their original seq_id, maintaining order +%% when requeued. %% -%% The order in which alphas are pushed to betas and message-form acks +%% The order in which alphas are pushed to betas and pending acks %% are pushed to disk is determined dynamically. We always prefer to %% push messages for the source (alphas or acks) that is growing the %% fastest (with growth measured as avg. ingress - avg. egress). In @@ -281,6 +280,8 @@ end_seq_id %% end_seq_id is exclusive }). +-record(merge_funs, {new, join, out, in, publish}). + %% When we discover, on publish, that we should write some indices to %% disk for some betas, the IO_BATCH_SIZE sets the number of betas %% that we must be due to write indices for before we do any work at @@ -434,7 +435,7 @@ terminate(_Reason, State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(true, State), + purge_pending_ack(true, State), PRef = case MSCStateP of undefined -> undefined; _ -> ok = rabbit_msg_store:client_terminate(MSCStateP), @@ -450,12 +451,12 @@ terminate(_Reason, State) -> %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(_Reason, State) -> %% TODO: there is no need to interact with qi at all - which we do - %% as part of 'purge' and 'remove_pending_ack', other than + %% as part of 'purge' and 'purge_pending_ack', other than %% deleting it. {_PurgeCount, State1} = purge(State), State2 = #vqstate { index_state = IndexState, msg_store_clients = {MSCStateP, MSCStateT} } = - remove_pending_ack(false, State1), + purge_pending_ack(false, State1), IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), case MSCStateP of undefined -> ok; @@ -558,34 +559,51 @@ fetch(AckRequired, State) -> {Res, a(State3)} end. +ack([], State) -> + {[], State}; ack(AckTags, State) -> - {MsgIds, State1} = ack(fun msg_store_remove/3, - fun (_, State0) -> State0 end, - AckTags, State), - {MsgIds, a(State1)}. - -requeue(AckTags, MsgPropsFun, State) -> - MsgPropsFun1 = fun (MsgProps) -> - (MsgPropsFun(MsgProps)) #message_properties { - needs_confirming = false } - end, - {MsgIds, State1} = - ack(fun (_, _, _) -> ok end, - fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) -> - {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps), - true, false, State1), - State2; - ({IsPersistent, MsgId, MsgProps}, State1) -> - #vqstate { msg_store_clients = MSCState } = State1, - {{ok, Msg = #basic_message{}}, MSCState1} = - msg_store_read(MSCState, IsPersistent, MsgId), - State2 = State1 #vqstate { msg_store_clients = MSCState1 }, - {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps), - true, true, State2), - State3 - end, - AckTags, State), - {MsgIds, a(reduce_memory_use(State1))}. + {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + persistent_count = PCount, + ack_out_counter = AckOutCount }} = + lists:foldl( + fun (SeqId, {Acc, State2}) -> + {MsgStatus, State3} = remove_pending_ack(SeqId, State2), + {accumulate_ack(MsgStatus, Acc), State3} + end, {accumulate_ack_init(), State}, AckTags), + IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], + PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( + orddict:new(), MsgIdsByStore)), + {lists:reverse(AllMsgIds), + a(State1 #vqstate { index_state = IndexState1, + persistent_count = PCount1, + ack_out_counter = AckOutCount + length(AckTags) })}. + +requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta, + q3 = Q3, + q4 = Q4, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [], + beta_limit(Q3), + alpha_funs(), + MsgPropsFun, State), + {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds, + delta_limit(Delta), + beta_funs(), + MsgPropsFun, State1), + {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1, + MsgPropsFun, State2), + MsgCount = length(MsgIds2), + {MsgIds2, a(reduce_memory_use( + State3 #vqstate { delta = Delta1, + q3 = Q3a, + q4 = Q4a, + in_counter = InCounter + MsgCount, + len = Len + MsgCount }))}. len(#vqstate { len = Len }) -> Len. @@ -782,6 +800,8 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId }, msg_on_disk = false, index_on_disk = false, msg_props = MsgProps }. +trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }. + with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) -> {Result, MSCStateP1} = Fun(MSCStateP), {Result, {MSCStateP1, MSCStateT}}; @@ -835,26 +855,30 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, IndexState) -> +betas_from_index_entries(List, TransientThreshold, PA, IndexState) -> {Filtered, Delivers, Acks} = lists:foldr( fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}, - {Filtered1, Delivers1, Acks1}) -> + {Filtered1, Delivers1, Acks1} = Acc) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> {Filtered1, cons_if(not IsDelivered, SeqId, Delivers1), [SeqId | Acks1]}; - false -> {[m(#msg_status { msg = undefined, - msg_id = MsgId, - seq_id = SeqId, - is_persistent = IsPersistent, - is_delivered = IsDelivered, - msg_on_disk = true, - index_on_disk = true, - msg_props = MsgProps - }) | Filtered1], - Delivers1, - Acks1} + false -> case dict:is_key(SeqId, PA) of + false -> {[m(#msg_status { + seq_id = SeqId, + msg_id = MsgId, + msg = undefined, + is_persistent = IsPersistent, + is_delivered = IsDelivered, + msg_on_disk = true, + index_on_disk = true, + msg_props = MsgProps + }) | Filtered1], + Delivers1, + Acks1}; + true -> Acc + end end end, {[], [], []}, List), {bpqueue:from_list([{true, Filtered}]), @@ -1015,11 +1039,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status { end, Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end, IndexState2 = - case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of - {false, true, false, _} -> Rem(), IndexState1; - {false, true, true, _} -> Rem(), Ack(); - { true, true, true, false} -> Ack(); - _ -> IndexState1 + case {AckRequired, MsgOnDisk, IndexOnDisk} of + {false, true, false} -> Rem(), IndexState1; + {false, true, true} -> Rem(), Ack(); + _ -> IndexState1 end, %% 3. If an ack is required, add something sensible to PA @@ -1167,15 +1190,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, record_pending_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, - is_persistent = IsPersistent, - msg_on_disk = MsgOnDisk, - msg_props = MsgProps } = MsgStatus, + msg_on_disk = MsgOnDisk } = MsgStatus, State = #vqstate { pending_ack = PA, ram_ack_index = RAI, ack_in_counter = AckInCount}) -> {AckEntry, RAI1} = case MsgOnDisk of - true -> {{IsPersistent, MsgId, MsgProps}, RAI}; + true -> {m(trim_msg_status(MsgStatus)), RAI}; false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)} end, PA1 = dict:store(SeqId, AckEntry, PA), @@ -1183,12 +1204,20 @@ record_pending_ack(#msg_status { seq_id = SeqId, ram_ack_index = RAI1, ack_in_counter = AckInCount + 1}. -remove_pending_ack(KeepPersistent, - State = #vqstate { pending_ack = PA, - index_state = IndexState, - msg_store_clients = MSCState }) -> - {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} = - dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA), +remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA, + ram_ack_index = RAI }) -> + {dict:fetch(SeqId, PA), + State #vqstate { pending_ack = dict:erase(SeqId, PA), + ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}. + +purge_pending_ack(KeepPersistent, + State = #vqstate { pending_ack = PA, + index_state = IndexState, + msg_store_clients = MSCState }) -> + {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = + dict:fold(fun (_SeqId, MsgStatus, Acc) -> + accumulate_ack(MsgStatus, Acc) + end, accumulate_ack_init(), PA), State1 = State #vqstate { pending_ack = dict:new(), ram_ack_index = gb_trees:empty() }, case KeepPersistent of @@ -1199,52 +1228,25 @@ remove_pending_ack(KeepPersistent, State1 end; false -> IndexState1 = - rabbit_queue_index:ack(PersistentSeqIds, IndexState), + rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], State1 #vqstate { index_state = IndexState1 } end. -ack(_MsgStoreFun, _Fun, [], State) -> - {[], State}; -ack(MsgStoreFun, Fun, AckTags, State) -> - {{PersistentSeqIds, MsgIdsByStore, AllMsgIds}, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - persistent_count = PCount, - ack_out_counter = AckOutCount }} = - lists:foldl( - fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA, - ram_ack_index = RAI }}) -> - AckEntry = dict:fetch(SeqId, PA), - {accumulate_ack(SeqId, AckEntry, Acc), - Fun(AckEntry, State2 #vqstate { - pending_ack = dict:erase(SeqId, PA), - ram_ack_index = - gb_trees:delete_any(SeqId, RAI)})} - end, {accumulate_ack_init(), State}, AckTags), - IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState), - [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len( - orddict:new(), MsgIdsByStore)), - {lists:reverse(AllMsgIds), - State1 #vqstate { index_state = IndexState1, - persistent_count = PCount1, - ack_out_counter = AckOutCount + length(AckTags) }}. - accumulate_ack_init() -> {[], orddict:new(), []}. -accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS - msg_on_disk = false, - index_on_disk = false, - msg_id = MsgId }, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]}; -accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps}, - {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> - {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc), - rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore), +accumulate_ack(#msg_status { seq_id = SeqId, + msg_id = MsgId, + is_persistent = IsPersistent, + msg_on_disk = MsgOnDisk, + index_on_disk = IndexOnDisk }, + {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> + {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), + case MsgOnDisk of + true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + false -> MsgIdsByStore + end, [MsgId | AllMsgIds]}. find_persistent_count(LensByStore) -> @@ -1319,6 +1321,122 @@ msg_indices_written_to_disk(Callback, MsgIdSet) -> end). %%---------------------------------------------------------------------------- +%% Internal plumbing for requeue +%%---------------------------------------------------------------------------- + +alpha_funs() -> + #merge_funs { + new = fun queue:new/0, + join = fun queue:join/2, + out = fun queue:out/1, + in = fun queue:in/2, + publish = fun (#msg_status { msg = undefined } = MsgStatus, State) -> + read_msg(MsgStatus, State); + (MsgStatus, #vqstate { + ram_msg_count = RamMsgCount } = State) -> + {MsgStatus, State #vqstate { + ram_msg_count = RamMsgCount + 1 }} + end}. + +beta_funs() -> + #merge_funs { + new = fun bpqueue:new/0, + join = fun bpqueue:join/2, + out = fun (Q) -> + case bpqueue:out(Q) of + {{value, _IndexOnDisk, MsgStatus}, Q1} -> + {{value, MsgStatus}, Q1}; + {empty, _Q1} = X -> + X + end + end, + in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) -> + bpqueue:in(IOD, MsgStatus, Q) + end, + publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus, + State) -> + {#msg_status { index_on_disk = IndexOnDisk, + msg = Msg} = MsgStatus1, + #vqstate { ram_index_count = RamIndexCount, + ram_msg_count = RamMsgCount } = + State1} = + maybe_write_to_disk(not MsgOnDisk, false, + MsgStatus, State), + {MsgStatus1, State1 #vqstate { + ram_msg_count = RamMsgCount + + one_if(Msg =/= undefined), + ram_index_count = RamIndexCount + + one_if(not IndexOnDisk) }} + end}. + +%% Rebuild queue, inserting sequence ids to maintain ordering +queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs, + MsgPropsFun, State) -> + queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State). + +queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, + #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs, + MsgPropsFun, State) + when Limit == undefined orelse SeqId < Limit -> + case QOut(Q) of + {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1} + when SeqIdQ < SeqId -> + %% enqueue from the remaining queue + queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds, + Limit, Funs, MsgPropsFun, State); + {_, _Q1} -> + %% enqueue from the remaining list of sequence ids + {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun, + State), + {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = + QPublish(MsgStatus, State1), + queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds], + Limit, Funs, MsgPropsFun, State2) + end; +queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin }, + _MsgPropsFun, State) -> + {SeqIds, QJoin(Front, Q), MsgIds, State}. + +delta_merge([], Delta, MsgIds, _MsgPropsFun, State) -> + {Delta, MsgIds, State}; +delta_merge(SeqIds, #delta { start_seq_id = StartSeqId, + count = Count, + end_seq_id = EndSeqId} = Delta, + MsgIds, MsgPropsFun, State) -> + lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> + {#msg_status { msg_id = MsgId, + index_on_disk = IndexOnDisk, + msg_on_disk = MsgOnDisk} = MsgStatus, + State1} = + msg_from_pending_ack(SeqId, MsgPropsFun, State0), + {_MsgStatus, State2} = + maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk, + MsgStatus, State1), + {Delta0 #delta { + start_seq_id = lists:min([SeqId, StartSeqId]), + count = Count + 1, + end_seq_id = lists:max([SeqId + 1, EndSeqId]) }, + [MsgId | MsgIds0], State2} + end, {Delta, MsgIds, State}, SeqIds). + +%% Mostly opposite of record_pending_ack/2 +msg_from_pending_ack(SeqId, MsgPropsFun, State) -> + {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = + remove_pending_ack(SeqId, State), + {MsgStatus #msg_status { + msg_props = (MsgPropsFun(MsgProps)) #message_properties { + needs_confirming = false } }, State1}. + +beta_limit(BPQ) -> + case bpqueue:out(BPQ) of + {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId; + {empty, _BPQ} -> undefined + end. + +delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined; +delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId. + +%%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -1394,12 +1512,11 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA, {Quota, State}; false -> {SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI), - MsgStatus = #msg_status { - msg_id = MsgId, %% ASSERTION - is_persistent = false, %% ASSERTION - msg_props = MsgProps } = dict:fetch(SeqId, PA), - {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State), - PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA), + MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} = + dict:fetch(SeqId, PA), + {MsgStatus1, State1} = + maybe_write_to_disk(true, false, MsgStatus, State), + PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA), limit_ram_acks(Quota - 1, State1 #vqstate { pending_ack = PA1, ram_ack_index = RAI1 }) @@ -1505,6 +1622,7 @@ maybe_deltas_to_betas(State = #vqstate { delta = Delta, q3 = Q3, index_state = IndexState, + pending_ack = PA, transient_threshold = TransientThreshold }) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, @@ -1515,7 +1633,7 @@ maybe_deltas_to_betas(State = #vqstate { {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), {Q3a, IndexState2} = - betas_from_index_entries(List, TransientThreshold, IndexState1), + betas_from_index_entries(List, TransientThreshold, PA, IndexState1), State1 = State #vqstate { index_state = IndexState2 }, case bpqueue:len(Q3a) of 0 -> @@ -1587,9 +1705,9 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = maybe_write_to_disk(true, false, MsgStatus, State), - MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }), + MsgStatus2 = m(trim_msg_status(MsgStatus1)), RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), - State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa, Consumer(MsgStatus2, Qa, State2)) |
