diff options
| author | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 17:53:43 +0200 |
|---|---|---|
| committer | Alvaro Videla <videlalvaro@gmail.com> | 2015-10-10 17:53:43 +0200 |
| commit | a3490dc3acd3a624f0cc51ece84ebcd65fb9235e (patch) | |
| tree | 084733e8119cf72e2d6f56b6713d217f74390ff3 /src | |
| parent | 69554ab2789e33ff09f514cb9457f92f674347f3 (diff) | |
| download | rabbitmq-server-git-a3490dc3acd3a624f0cc51ece84ebcd65fb9235e.tar.gz | |
queue_out/1 for lazy queues
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 24 |
1 files changed, 22 insertions, 2 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index eb2e86098a..317eeaf710 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1354,7 +1354,7 @@ in_r(MsgStatus = #msg_status { seq_id = SeqId }, State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) } end. -queue_out(State = #vqstate { q4 = Q4 }) -> +queue_out(State = #vqstate { mode = default, q4 = Q4 }) -> case ?QUEUE:out(Q4) of {empty, _Q4} -> case fetch_from_q3(State) of @@ -1363,6 +1363,12 @@ queue_out(State = #vqstate { q4 = Q4 }) -> end; {{value, MsgStatus}, Q4a} -> {{value, MsgStatus}, State #vqstate { q4 = Q4a }} + end; +%% lazy queues +queue_out(State = #vqstate { mode = lazy }) -> + case fetch_from_q3(State) of + {empty, _State1} = Result -> Result; + {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1} end. read_msg(#msg_status{msg = undefined, @@ -2248,7 +2254,8 @@ chunk_size(Current, Permitted) chunk_size(Current, Permitted) -> Current - Permitted. -fetch_from_q3(State = #vqstate { q1 = Q1, +fetch_from_q3(State = #vqstate { mode = default, + q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, q3 = Q3, @@ -2278,6 +2285,19 @@ fetch_from_q3(State = #vqstate { q1 = Q1, State1 end, {loaded, {MsgStatus, State2}} + end; +%% lazy queues +fetch_from_q3(State = #vqstate { mode = lazy, + delta = #delta { count = DeltaCount }, + q3 = Q3 }) -> + case ?QUEUE:out(Q3) of + {empty, _Q3} when DeltaCount =:= 0 -> + {empty, State}; + {empty, _Q3} -> + fetch_from_q3(maybe_deltas_to_betas(State)); + {{value, MsgStatus}, Q3a} -> + State1 = State #vqstate { q3 = Q3a }, + {loaded, {MsgStatus, State1}} end. maybe_deltas_to_betas(State) -> |
