summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@novemberain.com>2015-11-18 15:49:27 +0300
committerMichael Klishin <michael@novemberain.com>2015-11-18 15:49:27 +0300
commit4b09cda42c32dd21a6880fdc52cd9da663c77432 (patch)
tree80dc562389c1d3ca4efd6e3b6be3acdf63976869
parent52ce9fa48c596424a39059d13aad3c3183d42cee (diff)
parent8928a29b43322e6dee9f3b5a6e9b546a93047a89 (diff)
downloadrabbitmq-server-git-4b09cda42c32dd21a6880fdc52cd9da663c77432.tar.gz
Merge pull request #377 from rabbitmq/rabbitmq-server-351
Implements Lazy Queues
-rw-r--r--src/rabbit_amqqueue_process.erl10
-rw-r--r--src/rabbit_mirror_queue_master.erl9
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_policies.erl12
-rw-r--r--src/rabbit_priority_queue.erl7
-rw-r--r--src/rabbit_variable_queue.erl283
6 files changed, 306 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 452047fdb2..ee331180ed 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -318,7 +318,8 @@ process_args_policy(State = #q{q = Q,
{<<"dead-letter-routing-key">>, fun res_arg/2, fun init_dlx_rkey/2},
{<<"message-ttl">>, fun res_min/2, fun init_ttl/2},
{<<"max-length">>, fun res_min/2, fun init_max_length/2},
- {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2}],
+ {<<"max-length-bytes">>, fun res_min/2, fun init_max_bytes/2},
+ {<<"queue-mode">>, fun res_arg/2, fun init_queue_mode/2}],
drop_expired_msgs(
lists:foldl(fun({Name, Resolve, Fun}, StateN) ->
Fun(args_policy_lookup(Name, Resolve, Q), StateN)
@@ -361,6 +362,13 @@ init_max_bytes(MaxBytes, State) ->
{_Dropped, State1} = maybe_drop_head(State#q{max_bytes = MaxBytes}),
State1.
+init_queue_mode(undefined, State) ->
+ State;
+init_queue_mode(Mode, State = #q {backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:set_queue_mode(binary_to_existing_atom(Mode, utf8), BQS),
+ State#q{backing_queue_state = BQS1}.
+
reply(Reply, NewState) ->
{NewState1, Timeout} = next_state(NewState),
{reply, Reply, ensure_stats_timer(ensure_rate_timer(NewState1)), Timeout}.
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index ee3a097a80..e63ee107b0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -23,7 +23,7 @@
len/1, is_empty/1, depth/1, drain_confirmed/1,
dropwhile/2, fetchwhile/4, set_ram_duration_target/2, ram_duration/1,
needs_timeout/1, timeout/1, handle_pre_hibernate/1, resume/1,
- msg_rates/1, info/2, invoke/3, is_duplicate/2]).
+ msg_rates/1, info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
-export([start/1, stop/0, delete_crashed/1]).
@@ -485,6 +485,13 @@ is_duplicate(Message = #basic_message { id = MsgId },
confirmed = [MsgId | Confirmed] }}
end.
+set_queue_mode(Mode, State = #state { gm = GM,
+ backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ ok = gm:broadcast(GM, {set_queue_mode, Mode}),
+ BQS1 = BQ:set_queue_mode(Mode, BQS),
+ State #state { backing_queue_state = BQS1 }.
+
%% ---------------------------------------------------------------------------
%% Other exported functions
%% ---------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index d68852f336..225c21dd54 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -942,7 +942,12 @@ process_instruction({delete_and_terminate, Reason},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, State #state { backing_queue_state = undefined }}.
+ {stop, State #state { backing_queue_state = undefined }};
+process_instruction({set_queue_mode, Mode},
+ State = #state { backing_queue = BQ,
+ backing_queue_state = BQS }) ->
+ BQS1 = BQ:set_queue_mode(Mode, BQS),
+ {ok, State #state { backing_queue_state = BQS1 }}.
maybe_flow_ack(Sender, flow) -> credit_flow:ack(Sender);
maybe_flow_ack(_Sender, noflow) -> ok.
diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl
index 65f3801e3e..a4e1e9be4a 100644
--- a/src/rabbit_policies.erl
+++ b/src/rabbit_policies.erl
@@ -35,7 +35,8 @@ register() ->
{policy_validator, <<"message-ttl">>},
{policy_validator, <<"expires">>},
{policy_validator, <<"max-length">>},
- {policy_validator, <<"max-length-bytes">>}]],
+ {policy_validator, <<"max-length-bytes">>},
+ {policy_validator, <<"queue-mode">>}]],
ok.
validate_policy(Terms) ->
@@ -83,4 +84,11 @@ validate_policy0(<<"max-length-bytes">>, Value)
when is_integer(Value), Value >= 0 ->
ok;
validate_policy0(<<"max-length-bytes">>, Value) ->
- {error, "~p is not a valid maximum length in bytes", [Value]}.
+ {error, "~p is not a valid maximum length in bytes", [Value]};
+
+validate_policy0(<<"queue-mode">>, <<"default">>) ->
+ ok;
+validate_policy0(<<"queue-mode">>, <<"lazy">>) ->
+ ok;
+validate_policy0(<<"queue-mode">>, Value) ->
+ {error, "~p is not a valid queue-mode value", [Value]}.
diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl
index 4d638b334a..46a3991d88 100644
--- a/src/rabbit_priority_queue.erl
+++ b/src/rabbit_priority_queue.erl
@@ -40,7 +40,7 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2]).
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2]).
%% for rabbit_mirror_queue_sync.
-export([partition_publish_delivered_batch/1]).
@@ -430,6 +430,11 @@ is_duplicate(Msg, State = #state{bq = BQ}) ->
is_duplicate(Msg, State = #passthrough{bq = BQ, bqs = BQS}) ->
?passthrough2(is_duplicate(Msg, BQS)).
+set_queue_mode(Mode, State = #state{bq = BQ}) ->
+ foreach1(fun (_P, BQSN) -> BQ:set_queue_mode(Mode, BQSN) end, State);
+set_queue_mode(Mode, State = #passthrough{bq = BQ, bqs = BQS}) ->
+ ?passthrough1(set_queue_mode(Mode, BQS)).
+
%%----------------------------------------------------------------------------
bq() ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index d1f45ade4f..19878580db 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -25,7 +25,8 @@
ackfold/4, fold/3, len/1, is_empty/1, depth/1,
set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
handle_pre_hibernate/1, resume/1, msg_rates/1,
- info/2, invoke/3, is_duplicate/2, multiple_routing_keys/0]).
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ multiple_routing_keys/0]).
-export([start/1, stop/0]).
@@ -302,7 +303,10 @@
disk_read_count,
disk_write_count,
- io_batch_size
+ io_batch_size,
+
+ %% default queue or lazy queue
+ mode
}).
-record(rates, { in, out, ack_in, ack_out, timestamp }).
@@ -399,7 +403,8 @@
disk_read_count :: non_neg_integer(),
disk_write_count :: non_neg_integer(),
- io_batch_size :: pos_integer()}).
+ io_batch_size :: pos_integer(),
+ mode :: 'default' | 'lazy' }).
%% Duplicated from rabbit_backing_queue
-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
@@ -667,7 +672,8 @@ ack(AckTags, State) ->
a(State1 #vqstate { index_state = IndexState1,
ack_out_counter = AckOutCount + length(AckTags) })}.
-requeue(AckTags, #vqstate { delta = Delta,
+requeue(AckTags, #vqstate { mode = default,
+ delta = Delta,
q3 = Q3,
q4 = Q4,
in_counter = InCounter,
@@ -687,6 +693,23 @@ requeue(AckTags, #vqstate { delta = Delta,
q3 = Q3a,
q4 = Q4a,
in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))};
+requeue(AckTags, #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
+ delta_limit(Delta),
+ fun publish_beta/2, State),
+ {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
+ State1),
+ MsgCount = length(MsgIds1),
+ {MsgIds1, a(reduce_memory_use(
+ maybe_update_rates(
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ in_counter = InCounter + MsgCount,
len = Len + MsgCount })))}.
ackfold(MsgFun, Acc, State, AckTags) ->
@@ -852,6 +875,7 @@ info(disk_writes, #vqstate{disk_write_count = Count}) ->
Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = Mode,
len = Len,
target_ram_count = TargetRamCount,
next_seq_id = NextSeqId,
@@ -860,7 +884,8 @@ info(backing_queue_status, #vqstate {
ack_in = AvgAckIngressRate,
ack_out = AvgAckEgressRate }}) ->
- [ {q1 , ?QUEUE:len(Q1)},
+ [ {mode , Mode},
+ {q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{delta , Delta},
{q3 , ?QUEUE:len(Q3)},
@@ -880,6 +905,51 @@ invoke( _, _, State) -> State.
is_duplicate(_Msg, State) -> {false, State}.
+set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
+ State;
+set_queue_mode(lazy, State = #vqstate {
+ target_ram_count = TargetRamCount }) ->
+ %% To become a lazy queue we need to page everything to disk first.
+ State1 = convert_to_lazy(State),
+ %% restore the original target_ram_count
+ a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
+set_queue_mode(default, State) ->
+ %% becoming a default queue means loading messages from disk like
+ %% when a queue is recovered.
+ a(maybe_deltas_to_betas(State #vqstate { mode = default }));
+set_queue_mode(_, State) ->
+ State.
+
+convert_to_lazy(State) ->
+ State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
+ set_ram_duration_target(0, State),
+ case Delta#delta.count + ?QUEUE:len(Q3) == Len of
+ true ->
+ State1;
+ false ->
+ %% When pushing messages to disk, we might have been
+ %% blocked by the msg_store, so we need to see if we have
+ %% to wait for more credit, and then keep paging messages.
+ %%
+ %% The amqqueue_process could have taken care of this, but
+ %% between the time it receives the bump_credit msg and
+ %% calls BQ:resume to keep paging messages to disk, some
+ %% other request may arrive to the BQ which at this moment
+ %% is not in a proper state for a lazy BQ (unless all
+ %% messages have been paged to disk already).
+ wait_for_msg_store_credit(),
+ convert_to_lazy(State1)
+ end.
+
+wait_for_msg_store_credit() ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg)
+ end;
+ false -> ok
+ end.
+
%% Get the Timestamp property of the first msg, if present. This is
%% the one with the oldest timestamp among the heads of the pending
%% acks and unread queues. We can't check disk_pending_acks as these
@@ -935,8 +1005,8 @@ get_collection_head(Col, IsEmpty, GetVal) ->
%%----------------------------------------------------------------------------
%% Minor helpers
%%----------------------------------------------------------------------------
-
a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = default,
len = Len,
bytes = Bytes,
unacked_bytes = UnackedBytes,
@@ -951,9 +1021,16 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
E4 = ?QUEUE:is_empty(Q4),
LZ = Len == 0,
+ %% if q1 has messages then q3 cannot be empty. See publish/6.
true = E1 or not E3,
+ %% if q2 has messages then we have messages in delta (paged to
+ %% disk). See push_alphas_to_betas/2.
true = E2 or not ED,
+ %% if delta has messages then q3 cannot be empty. This is enforced
+ %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages
+ %% are always kept on RAM.
true = ED or not E3,
+ %% if the queue length is 0, then q3 and q4 must be empty.
true = LZ == (E3 and E4),
true = Len >= 0,
@@ -966,6 +1043,53 @@ a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
true = RamBytes >= 0,
true = RamBytes =< Bytes + UnackedBytes,
+ State;
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = lazy,
+ len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
+ persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
+ LZ = Len == 0,
+ L3 = ?QUEUE:len(Q3),
+
+ %% q1 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E1,
+
+ %% q2 only gets messages from q1 when push_alphas_to_betas is
+ %% called for a non empty delta, which won't be the case for a
+ %% lazy queue. This means q2 must always be empty.
+ true = E2,
+
+ %% q4 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E4,
+
+ %% if the queue is empty, then delta is empty and q3 is empty.
+ true = LZ == (ED and E3),
+
+ %% There should be no messages in q1, q2, and q4
+ true = Delta#delta.count + L3 == Len,
+
+ true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
+ true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
+ true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
+
State.
d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
@@ -1203,7 +1327,9 @@ init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
disk_read_count = 0,
disk_write_count = 0,
- io_batch_size = IoBatchSize },
+ io_batch_size = IoBatchSize,
+
+ mode = default },
a(maybe_deltas_to_betas(State)).
blank_rates(Now) ->
@@ -1214,7 +1340,7 @@ blank_rates(Now) ->
timestamp = Now}.
in_r(MsgStatus = #msg_status { msg = undefined },
- State = #vqstate { q3 = Q3, q4 = Q4 }) ->
+ State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
case ?QUEUE:is_empty(Q4) of
true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
@@ -1223,10 +1349,24 @@ in_r(MsgStatus = #msg_status { msg = undefined },
stats(ready0, {MsgStatus, MsgStatus1},
State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
end;
-in_r(MsgStatus, State = #vqstate { q4 = Q4 }) ->
- State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) }.
+in_r(MsgStatus,
+ State = #vqstate { mode = default, q4 = Q4 }) ->
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
+%% lazy queues
+in_r(MsgStatus = #msg_status { seq_id = SeqId },
+ State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
+ case ?QUEUE:is_empty(Q3) of
+ true ->
+ {_MsgStatus1, State1} =
+ maybe_write_to_disk(true, true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
+ Delta1 = expand_delta(SeqId, Delta),
+ State2 #vqstate{ delta = Delta1 };
+ false ->
+ State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
+ end.
-queue_out(State = #vqstate { q4 = Q4 }) ->
+queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
case ?QUEUE:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
@@ -1235,6 +1375,12 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
end;
{{value, MsgStatus}, Q4a} ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end;
+%% lazy queues
+queue_out(State = #vqstate { mode = lazy }) ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
end.
read_msg(#msg_status{msg = undefined,
@@ -1254,11 +1400,13 @@ read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
stats(Signs, Statuses, State) ->
stats0(expand_signs(Signs), expand_statuses(Statuses), State).
-expand_signs(ready0) -> {0, 0, true};
-expand_signs({A, B}) -> {A, B, false}.
+expand_signs(ready0) -> {0, 0, true};
+expand_signs(lazy_pub) -> {1, 0, true};
+expand_signs({A, B}) -> {A, B, false}.
expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({lazy, A}) -> {false , false, A};
expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
%% In this function at least, we are religious: the variable name
@@ -1546,10 +1694,12 @@ process_delivers_and_acks_fun(_) ->
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
+
publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
MsgProps = #message_properties { needs_confirming = NeedsConfirming },
IsDelivered, _ChPid, _Flow, PersistFun,
State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ mode = default,
qi_embed_msgs_below = IndexMaxSize,
next_seq_id = SeqId,
in_counter = InCount,
@@ -1567,6 +1717,26 @@ publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
stats({1, 0}, {none, MsgStatus1},
State2#vqstate{ next_seq_id = SeqId + 1,
in_counter = InCount1,
+ unconfirmed = UC1 });
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC,
+ delta = Delta }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ Delta1 = expand_delta(SeqId, Delta),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats(lazy_pub, {lazy, m(MsgStatus1)},
+ State1#vqstate{ delta = Delta1,
+ next_seq_id = SeqId + 1,
+ in_counter = InCount + 1,
unconfirmed = UC1 }).
batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
@@ -1578,7 +1748,8 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
MsgProps = #message_properties {
needs_confirming = NeedsConfirming },
_ChPid, _Flow, PersistFun,
- State = #vqstate { qi_embed_msgs_below = IndexMaxSize,
+ State = #vqstate { mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
next_seq_id = SeqId,
out_counter = OutCount,
in_counter = InCount,
@@ -1594,6 +1765,29 @@ publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
out_counter = OutCount + 1,
in_counter = InCount + 1,
unconfirmed = UC1 }),
+ {SeqId, State3};
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
{SeqId, State3}.
batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
@@ -2063,6 +2257,7 @@ ifold(Fun, Acc, Its, State) ->
reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
State;
reduce_memory_use(State = #vqstate {
+ mode = default,
ram_pending_ack = RPA,
ram_msg_count = RamMsgCount,
target_ram_count = TargetRamCount,
@@ -2108,6 +2303,30 @@ reduce_memory_use(State = #vqstate {
end,
%% See rabbitmq-server-290 for the reasons behind this GC call.
garbage_collect(),
+ State3;
+%% When using lazy queues, there are no alphas, so we don't need to
+%% call push_alphas_to_betas/2.
+reduce_memory_use(State = #vqstate {
+ mode = lazy,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount }) ->
+ State1 = #vqstate { q3 = Q3 } =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> State;
+ S1 -> {_, State2} = limit_ram_acks(S1, State),
+ State2
+ end,
+
+ State3 =
+ case chunk_size(?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ 0 ->
+ State1;
+ S2 ->
+ push_betas_to_deltas(S2, State1)
+ end,
+ garbage_collect(),
State3.
limit_ram_acks(0, State) ->
@@ -2131,6 +2350,9 @@ limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
permitted_beta_count(#vqstate { len = 0 }) ->
infinity;
+permitted_beta_count(#vqstate { mode = lazy,
+ target_ram_count = TargetRamCount}) ->
+ TargetRamCount;
permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
permitted_beta_count(#vqstate { q1 = Q1,
@@ -2148,7 +2370,8 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
Current - Permitted.
-fetch_from_q3(State = #vqstate { q1 = Q1,
+fetch_from_q3(State = #vqstate { mode = default,
+ q1 = Q1,
q2 = Q2,
delta = #delta { count = DeltaCount },
q3 = Q3,
@@ -2178,6 +2401,19 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
State1
end,
{loaded, {MsgStatus, State2}}
+ end;
+%% lazy queues
+fetch_from_q3(State = #vqstate { mode = lazy,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} when DeltaCount =:= 0 ->
+ {empty, State};
+ {empty, _Q3} ->
+ fetch_from_q3(maybe_deltas_to_betas(State));
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ {loaded, {MsgStatus, State1}}
end.
maybe_deltas_to_betas(State) ->
@@ -2286,7 +2522,8 @@ push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
end
end.
-push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
+push_betas_to_deltas(Quota, State = #vqstate { mode = default,
+ q2 = Q2,
delta = Delta,
q3 = Q3}) ->
PushState = {Quota, Delta, State},
@@ -2301,8 +2538,22 @@ push_betas_to_deltas(Quota, State = #vqstate { q2 = Q2,
{_, Delta1, State1} = PushState2,
State1 #vqstate { q2 = Q2a,
delta = Delta1,
+ q3 = Q3a };
+%% In the case of lazy queues we want to page as many messages as
+%% possible from q3.
+push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q3, PushState),
+ {_, Delta1, State1} = PushState1,
+ State1 #vqstate { delta = Delta1,
q3 = Q3a }.
+
push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
case ?QUEUE:is_empty(Q) of
true ->