diff options
| author | Michael Klishin <michael@novemberain.com> | 2015-11-18 15:49:27 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@novemberain.com> | 2015-11-18 15:49:27 +0300 |
| commit | 4b09cda42c32dd21a6880fdc52cd9da663c77432 (patch) | |
| tree | 80dc562389c1d3ca4efd6e3b6be3acdf63976869 | |
| parent | 52ce9fa48c596424a39059d13aad3c3183d42cee (diff) | |
| parent | 8928a29b43322e6dee9f3b5a6e9b546a93047a89 (diff) | |
| download | rabbitmq-server-git-4b09cda42c32dd21a6880fdc52cd9da663c77432.tar.gz | |
Merge pull request #377 from rabbitmq/rabbitmq-server-351
Implements Lazy Queues
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 10 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_master.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 283 |
6 files changed, 306 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 452047fdb2..ee331180ed 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -318,7 +318,8 @@ process_args_policy(State = #q{q = Q, {<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2}, {<<"message-ttl">>, fun res_min/2, fun init_ttl/2}, {<<"max-length">>, fun res_min/2, fun init_max_length/2}, - {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}], + {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}, + {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}], drop_expired_msgs( lists:foldl(fun({Name, Resolve, Fun}, StateN) -> Fun(args_policy_lookup(Name, Resolve, Q), StateN) @@ -361,6 +362,13 @@ init_max_bytes(MaxBytes, State) -> {_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}), State1. +init_queue_mode(undefined, State) -> + State; +init_queue_mode(Mode, State = #q {backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS), + State#q{backing_queue_state = BQS1}. + reply(Reply, NewState) -> {NewState1, Timeout} = next_state(NewState), {reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}. diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index ee3a097a80..e63ee107b0 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -23,7 +23,7 @@ len/1, is_empty/1, depth/1, drain_confirmed/1, dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, - msg_rates/1, info/2, invoke/3, is_duplicate/2]). + msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). -export([start/1, stop/0, delete_crashed/1]). @@ -485,6 +485,13 @@ is_duplicate(Message = #basic_message { id = MsgId }, confirmed = [MsgId | Confirmed] }} end. +set_queue_mode(Mode, State = #state { gm = GM, + backing_queue = BQ, + backing_queue_state = BQS }) -> + ok = gm:broadcast(GM, {set_queue_mode, Mode}), + BQS1 = BQ:set_queue_mode(Mode, BQS), + State #state { backing_queue_state = BQS1 }. + %% --------------------------------------------------------------------------- %% Other exported functions %% --------------------------------------------------------------------------- diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index d68852f336..225c21dd54 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -942,7 +942,12 @@ process_instruction({delete_and_terminate, Reason}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQ:delete_and_terminate(Reason, BQS), - {stop, State #state { backing_queue_state = undefined }}. + {stop, State #state { backing_queue_state = undefined }}; +process_instruction({set_queue_mode, Mode}, + State = #state { backing_queue = BQ, + backing_queue_state = BQS }) -> + BQS1 = BQ:set_queue_mode(Mode, BQS), + {ok, State #state { backing_queue_state = BQS1 }}. maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender); maybe_flow_ack(_Sender, noflow) -> ok. diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 65f3801e3e..a4e1e9be4a 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -35,7 +35,8 @@ register() -> {policy_validator, <<"message-ttl">>}, {policy_validator, <<"expires">>}, {policy_validator, <<"max-length">>}, - {policy_validator, <<"max-length-bytes">>}]], + {policy_validator, <<"max-length-bytes">>}, + {policy_validator, <<"queue-mode">>}]], ok. validate_policy(Terms) -> @@ -83,4 +84,11 @@ validate_policy0(<<"max-length-bytes">>, Value) when is_integer(Value), Value >= 0 -> ok; validate_policy0(<<"max-length-bytes">>, Value) -> - {error, "~p is not a valid maximum length in bytes", [Value]}. + {error, "~p is not a valid maximum length in bytes", [Value]}; + +validate_policy0(<<"queue-mode">>, <<"default">>) -> + ok; +validate_policy0(<<"queue-mode">>, <<"lazy">>) -> + ok; +validate_policy0(<<"queue-mode">>, Value) -> + {error, "~p is not a valid queue-mode value", [Value]}. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 4d638b334a..46a3991d88 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -40,7 +40,7 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2]). %% for rabbit_mirror_queue_sync. -export([partition_publish_delivered_batch/1]). @@ -430,6 +430,11 @@ is_duplicate(Msg, State = #state{bq = BQ}) -> is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough2(is_duplicate(Msg, BQS)). +set_queue_mode(Mode, State = #state{bq = BQ}) -> + foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State); +set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) -> + ?passthrough1(set_queue_mode(Mode, BQS)). + %%---------------------------------------------------------------------------- bq() -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index d1f45ade4f..19878580db 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -25,7 +25,8 @@ ackfold/4, fold/3, len/1, is_empty/1, depth/1, set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1, msg_rates/1, - info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]). + info/2, invoke/3, is_duplicate/2, set_queue_mode/2, + multiple_routing_keys/0]). -export([start/1, stop/0]). @@ -302,7 +303,10 @@ disk_read_count, disk_write_count, - io_batch_size + io_batch_size, + + %% default queue or lazy queue + mode }). -record(rates, { in, out, ack_in, ack_out, timestamp }). @@ -399,7 +403,8 @@ disk_read_count :: non_neg_integer(), disk_write_count :: non_neg_integer(), - io_batch_size :: pos_integer()}). + io_batch_size :: pos_integer(), + mode :: 'default' | 'lazy' }). %% Duplicated from rabbit_backing_queue -spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}). @@ -667,7 +672,8 @@ ack(AckTags, State) -> a(State1 #vqstate { index_state = IndexState1, ack_out_counter = AckOutCount + length(AckTags) })}. -requeue(AckTags, #vqstate { delta = Delta, +requeue(AckTags, #vqstate { mode = default, + delta = Delta, q3 = Q3, q4 = Q4, in_counter = InCounter, @@ -687,6 +693,23 @@ requeue(AckTags, #vqstate { delta = Delta, q3 = Q3a, q4 = Q4a, in_counter = InCounter + MsgCount, + len = Len + MsgCount })))}; +requeue(AckTags, #vqstate { mode = lazy, + delta = Delta, + q3 = Q3, + in_counter = InCounter, + len = Len } = State) -> + {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [], + delta_limit(Delta), + fun publish_beta/2, State), + {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds, + State1), + MsgCount = length(MsgIds1), + {MsgIds1, a(reduce_memory_use( + maybe_update_rates( + State2 #vqstate { delta = Delta1, + q3 = Q3a, + in_counter = InCounter + MsgCount, len = Len + MsgCount })))}. ackfold(MsgFun, Acc, State, AckTags) -> @@ -852,6 +875,7 @@ info(disk_writes, #vqstate{disk_write_count = Count}) -> Count; info(backing_queue_status, #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = Mode, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -860,7 +884,8 @@ info(backing_queue_status, #vqstate { ack_in = AvgAckIngressRate, ack_out = AvgAckEgressRate }}) -> - [ {q1 , ?QUEUE:len(Q1)}, + [ {mode , Mode}, + {q1 , ?QUEUE:len(Q1)}, {q2 , ?QUEUE:len(Q2)}, {delta , Delta}, {q3 , ?QUEUE:len(Q3)}, @@ -880,6 +905,51 @@ invoke( _, _, State) -> State. is_duplicate(_Msg, State) -> {false, State}. +set_queue_mode(Mode, State = #vqstate { mode = Mode }) -> + State; +set_queue_mode(lazy, State = #vqstate { + target_ram_count = TargetRamCount }) -> + %% To become a lazy queue we need to page everything to disk first. + State1 = convert_to_lazy(State), + %% restore the original target_ram_count + a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount }); +set_queue_mode(default, State) -> + %% becoming a default queue means loading messages from disk like + %% when a queue is recovered. + a(maybe_deltas_to_betas(State #vqstate { mode = default })); +set_queue_mode(_, State) -> + State. + +convert_to_lazy(State) -> + State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } = + set_ram_duration_target(0, State), + case Delta#delta.count + ?QUEUE:len(Q3) == Len of + true -> + State1; + false -> + %% When pushing messages to disk, we might have been + %% blocked by the msg_store, so we need to see if we have + %% to wait for more credit, and then keep paging messages. + %% + %% The amqqueue_process could have taken care of this, but + %% between the time it receives the bump_credit msg and + %% calls BQ:resume to keep paging messages to disk, some + %% other request may arrive to the BQ which at this moment + %% is not in a proper state for a lazy BQ (unless all + %% messages have been paged to disk already). + wait_for_msg_store_credit(), + convert_to_lazy(State1) + end. + +wait_for_msg_store_credit() -> + case credit_flow:blocked() of + true -> receive + {bump_credit, Msg} -> + credit_flow:handle_bump_msg(Msg) + end; + false -> ok + end. + %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -935,8 +1005,8 @@ get_collection_head(Col, IsEmpty, GetVal) -> %%---------------------------------------------------------------------------- %% Minor helpers %%---------------------------------------------------------------------------- - a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = default, len = Len, bytes = Bytes, unacked_bytes = UnackedBytes, @@ -951,9 +1021,16 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, E4 = ?QUEUE:is_empty(Q4), LZ = Len == 0, + %% if q1 has messages then q3 cannot be empty. See publish/6. true = E1 or not E3, + %% if q2 has messages then we have messages in delta (paged to + %% disk). See push_alphas_to_betas/2. true = E2 or not ED, + %% if delta has messages then q3 cannot be empty. This is enforced + %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages + %% are always kept on RAM. true = ED or not E3, + %% if the queue length is 0, then q3 and q4 must be empty. true = LZ == (E3 and E4), true = Len >= 0, @@ -966,6 +1043,53 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, true = RamBytes >= 0, true = RamBytes =< Bytes + UnackedBytes, + State; +a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, + mode = lazy, + len = Len, + bytes = Bytes, + unacked_bytes = UnackedBytes, + persistent_count = PersistentCount, + persistent_bytes = PersistentBytes, + ram_msg_count = RamMsgCount, + ram_bytes = RamBytes}) -> + E1 = ?QUEUE:is_empty(Q1), + E2 = ?QUEUE:is_empty(Q2), + ED = Delta#delta.count == 0, + E3 = ?QUEUE:is_empty(Q3), + E4 = ?QUEUE:is_empty(Q4), + LZ = Len == 0, + L3 = ?QUEUE:len(Q3), + + %% q1 must always be empty, since q1 only gets messages during + %% publish, but for lazy queues messages go straight to delta. + true = E1, + + %% q2 only gets messages from q1 when push_alphas_to_betas is + %% called for a non empty delta, which won't be the case for a + %% lazy queue. This means q2 must always be empty. + true = E2, + + %% q4 must always be empty, since q1 only gets messages during + %% publish, but for lazy queues messages go straight to delta. + true = E4, + + %% if the queue is empty, then delta is empty and q3 is empty. + true = LZ == (ED and E3), + + %% There should be no messages in q1, q2, and q4 + true = Delta#delta.count + L3 == Len, + + true = Len >= 0, + true = Bytes >= 0, + true = UnackedBytes >= 0, + true = PersistentCount >= 0, + true = PersistentBytes >= 0, + true = RamMsgCount >= 0, + true = RamMsgCount =< Len, + true = RamBytes >= 0, + true = RamBytes =< Bytes + UnackedBytes, + State. d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End }) @@ -1203,7 +1327,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms, disk_read_count = 0, disk_write_count = 0, - io_batch_size = IoBatchSize }, + io_batch_size = IoBatchSize, + + mode = default }, a(maybe_deltas_to_betas(State)). blank_rates(Now) -> @@ -1214,7 +1340,7 @@ blank_rates(Now) -> timestamp = Now}. in_r(MsgStatus = #msg_status { msg = undefined }, - State = #vqstate { q3 = Q3, q4 = Q4 }) -> + State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) -> case ?QUEUE:is_empty(Q4) of true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }; false -> {Msg, State1 = #vqstate { q4 = Q4a }} = @@ -1223,10 +1349,24 @@ in_r(MsgStatus = #msg_status { msg = undefined }, stats(ready0, {MsgStatus, MsgStatus1}, State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) }) end; -in_r(MsgStatus, State = #vqstate { q4 = Q4 }) -> - State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }. +in_r(MsgStatus, + State = #vqstate { mode = default, q4 = Q4 }) -> + State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }; +%% lazy queues +in_r(MsgStatus = #msg_status { seq_id = SeqId }, + State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) -> + case ?QUEUE:is_empty(Q3) of + true -> + {_MsgStatus1, State1} = + maybe_write_to_disk(true, true, MsgStatus, State), + State2 = stats(ready0, {MsgStatus, none}, State1), + Delta1 = expand_delta(SeqId, Delta), + State2 #vqstate{ delta = Delta1 }; + false -> + State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } + end. -queue_out(State = #vqstate { q4 = Q4 }) -> +queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of @@ -1235,6 +1375,12 @@ queue_out(State = #vqstate { q4 = Q4 }) -> end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end; +%% lazy queues +queue_out(State = #vqstate { mode = lazy }) -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end. read_msg(#msg_status{msg = undefined, @@ -1254,11 +1400,13 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState, stats(Signs, Statuses, State) -> stats0(expand_signs(Signs), expand_statuses(Statuses), State). -expand_signs(ready0) -> {0, 0, true}; -expand_signs({A, B}) -> {A, B, false}. +expand_signs(ready0) -> {0, 0, true}; +expand_signs(lazy_pub) -> {1, 0, true}; +expand_signs({A, B}) -> {A, B, false}. expand_statuses({none, A}) -> {false, msg_in_ram(A), A}; expand_statuses({B, none}) -> {msg_in_ram(B), false, B}; +expand_statuses({lazy, A}) -> {false , false, A}; expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}. %% In this function at least, we are religious: the variable name @@ -1546,10 +1694,12 @@ process_delivers_and_acks_fun(_) -> %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- + publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, IsDelivered, _ChPid, _Flow, PersistFun, State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + mode = default, qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, in_counter = InCount, @@ -1567,6 +1717,26 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, stats({1, 0}, {none, MsgStatus1}, State2#vqstate{ next_seq_id = SeqId + 1, in_counter = InCount1, + unconfirmed = UC1 }); +publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId }, + MsgProps = #message_properties { needs_confirming = NeedsConfirming }, + IsDelivered, _ChPid, _Flow, PersistFun, + State = #vqstate { mode = lazy, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC, + delta = Delta }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), + Delta1 = expand_delta(SeqId, Delta), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + stats(lazy_pub, {lazy, m(MsgStatus1)}, + State1#vqstate{ delta = Delta1, + next_seq_id = SeqId + 1, + in_counter = InCount + 1, unconfirmed = UC1 }). batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) -> @@ -1578,7 +1748,8 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, PersistFun, - State = #vqstate { qi_embed_msgs_below = IndexMaxSize, + State = #vqstate { mode = default, + qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, out_counter = OutCount, in_counter = InCount, @@ -1594,6 +1765,29 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, out_counter = OutCount + 1, in_counter = InCount + 1, unconfirmed = UC1 }), + {SeqId, State3}; +publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent, + id = MsgId }, + MsgProps = #message_properties { + needs_confirming = NeedsConfirming }, + _ChPid, _Flow, PersistFun, + State = #vqstate { mode = lazy, + qi_embed_msgs_below = IndexMaxSize, + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + durable = IsDurable, + unconfirmed = UC }) -> + IsPersistent1 = IsDurable andalso IsPersistent, + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State), + State2 = record_pending_ack(m(MsgStatus1), State1), + UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC), + State3 = stats({0, 1}, {none, MsgStatus1}, + State2 #vqstate { next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1, + unconfirmed = UC1 }), {SeqId, State3}. batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> @@ -2063,6 +2257,7 @@ ifold(Fun, Acc, Its, State) -> reduce_memory_use(State = #vqstate { target_ram_count = infinity }) -> State; reduce_memory_use(State = #vqstate { + mode = default, ram_pending_ack = RPA, ram_msg_count = RamMsgCount, target_ram_count = TargetRamCount, @@ -2108,6 +2303,30 @@ reduce_memory_use(State = #vqstate { end, %% See rabbitmq-server-290 for the reasons behind this GC call. garbage_collect(), + State3; +%% When using lazy queues, there are no alphas, so we don't need to +%% call push_alphas_to_betas/2. +reduce_memory_use(State = #vqstate { + mode = lazy, + ram_pending_ack = RPA, + ram_msg_count = RamMsgCount, + target_ram_count = TargetRamCount }) -> + State1 = #vqstate { q3 = Q3 } = + case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of + 0 -> State; + S1 -> {_, State2} = limit_ram_acks(S1, State), + State2 + end, + + State3 = + case chunk_size(?QUEUE:len(Q3), + permitted_beta_count(State1)) of + 0 -> + State1; + S2 -> + push_betas_to_deltas(S2, State1) + end, + garbage_collect(), State3. limit_ram_acks(0, State) -> @@ -2131,6 +2350,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA, permitted_beta_count(#vqstate { len = 0 }) -> infinity; +permitted_beta_count(#vqstate { mode = lazy, + target_ram_count = TargetRamCount}) -> + TargetRamCount; permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) -> lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]); permitted_beta_count(#vqstate { q1 = Q1, @@ -2148,7 +2370,8 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> Current - Permitted. -fetch_from_q3(State = #vqstate { q1 = Q1, +fetch_from_q3(State = #vqstate { mode = default, + q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, @@ -2178,6 +2401,19 @@ fetch_from_q3(State = #vqstate { q1 = Q1, State1 end, {loaded, {MsgStatus, State2}} + end; +%% lazy queues +fetch_from_q3(State = #vqstate { mode = lazy, + delta = #delta { count = DeltaCount }, + q3 = Q3 }) -> + case ?QUEUE:out(Q3) of + {empty, _Q3} when DeltaCount =:= 0 -> + {empty, State}; + {empty, _Q3} -> + fetch_from_q3(maybe_deltas_to_betas(State)); + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + {loaded, {MsgStatus, State1}} end. maybe_deltas_to_betas(State) -> @@ -2286,7 +2522,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) -> end end. -push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, +push_betas_to_deltas(Quota, State = #vqstate { mode = default, + q2 = Q2, delta = Delta, q3 = Q3}) -> PushState = {Quota, Delta, State}, @@ -2301,8 +2538,22 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2, {_, Delta1, State1} = PushState2, State1 #vqstate { q2 = Q2a, delta = Delta1, + q3 = Q3a }; +%% In the case of lazy queues we want to page as many messages as +%% possible from q3. +push_betas_to_deltas(Quota, State = #vqstate { mode = lazy, + delta = Delta, + q3 = Q3}) -> + PushState = {Quota, Delta, State}, + {Q3a, PushState1} = push_betas_to_deltas( + fun ?QUEUE:out_r/1, + fun (Q2MinSeqId) -> Q2MinSeqId end, + Q3, PushState), + {_, Delta1, State1} = PushState1, + State1 #vqstate { delta = Delta1, q3 = Q3a }. + push_betas_to_deltas(Generator, LimitFun, Q, PushState) -> case ?QUEUE:is_empty(Q) of true -> |
