summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:53:43 +0200
committerAlvaro Videla <videlalvaro@gmail.com>2015-10-10 17:53:43 +0200
commita3490dc3acd3a624f0cc51ece84ebcd65fb9235e (patch)
tree084733e8119cf72e2d6f56b6713d217f74390ff3 /src
parent69554ab2789e33ff09f514cb9457f92f674347f3 (diff)
downloadrabbitmq-server-git-a3490dc3acd3a624f0cc51ece84ebcd65fb9235e.tar.gz
queue_out/1 for lazy queues
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl24
1 files changed, 22 insertions, 2 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index eb2e86098a..317eeaf710 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1354,7 +1354,7 @@ in_r(MsgStatus = #msg_status { seq_id = SeqId },
State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
end.
-queue_out(State = #vqstate { q4 = Q4 }) ->
+queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
case ?QUEUE:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3(State) of
@@ -1363,6 +1363,12 @@ queue_out(State = #vqstate { q4 = Q4 }) ->
end;
{{value, MsgStatus}, Q4a} ->
{{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end;
+%% lazy queues
+queue_out(State = #vqstate { mode = lazy }) ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
end.
read_msg(#msg_status{msg = undefined,
@@ -2248,7 +2254,8 @@ chunk_size(Current, Permitted)
chunk_size(Current, Permitted) ->
Current - Permitted.
-fetch_from_q3(State = #vqstate { q1 = Q1,
+fetch_from_q3(State = #vqstate { mode = default,
+ q1 = Q1,
q2 = Q2,
delta = #delta { count = DeltaCount },
q3 = Q3,
@@ -2278,6 +2285,19 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
State1
end,
{loaded, {MsgStatus, State2}}
+ end;
+%% lazy queues
+fetch_from_q3(State = #vqstate { mode = lazy,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} when DeltaCount =:= 0 ->
+ {empty, State};
+ {empty, _Q3} ->
+ fetch_from_q3(maybe_deltas_to_betas(State));
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ {loaded, {MsgStatus, State1}}
end.
maybe_deltas_to_betas(State) ->