diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
1 files changed, 11 insertions, 18 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index bd91283750..fcebdddcec 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1223,20 +1223,14 @@ maybe_deltas_to_betas(State = #vqstate { maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, - fun (MsgStatus = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - Q1a, State1 = #vqstate { q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> - MsgStatus1 = MsgStatus #msg_status { msg = undefined }, - case DeltaCount == 0 of - true -> State1 #vqstate { - q1 = Q1a, - q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) }; - false -> State1 #vqstate { - q1 = Q1a, - q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } - end + fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q1a, State1 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) -> + State1 #vqstate { q1 = Q1a, + q3 = bpqueue:in(IndexOnDisk, MsgStatus, Q3) }; + (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, + Q1a, State1 = #vqstate { q2 = Q2, delta = #delta {} }) -> + State1 #vqstate { q1 = Q1a, + q2 = bpqueue:in(IndexOnDisk, MsgStatus, Q2) } end, Q1, State). maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> @@ -1244,9 +1238,7 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> fun queue:out_r/1, fun (MsgStatus = #msg_status { index_on_disk = IndexOnDisk }, Q4a, State1 = #vqstate { q3 = Q3 }) -> - MsgStatus1 = MsgStatus #msg_status { msg = undefined }, - %% these must go to q3 - State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus1, Q3), + State1 #vqstate { q3 = bpqueue:in_r(IndexOnDisk, MsgStatus, Q3), q4 = Q4a } end, Q4, State). @@ -1266,11 +1258,12 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = maybe_write_to_disk(true, ForceIndex, MsgStatus, State), + MsgStatus2 = MsgStatus1 #msg_status { msg = undefined }, RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, ram_index_count = RamIndexCount1 }, maybe_push_alphas_to_betas(Generator, Consumer, Qa, - Consumer(MsgStatus1, Qa, State2)) + Consumer(MsgStatus2, Qa, State2)) end. push_betas_to_deltas(State = #vqstate { q2 = Q2, |
