diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-06-03 14:07:28 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-06-03 14:07:28 +0100 |
| commit | d4c5750f8663fc4fb899ecd108226c3d735487f1 (patch) | |
| tree | e7308baa91d0381ee21892be465e784d28597274 | |
| parent | 043a2932acbc8e4bd7554a8a6f2209c81779b163 (diff) | |
| download | rabbitmq-server-git-d4c5750f8663fc4fb899ecd108226c3d735487f1.tar.gz | |
refactor: introduce helper fun
| -rw-r--r-- | src/rabbit_variable_queue.erl | 87 |
1 files changed, 38 insertions, 49 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 3e66e000b2..81f346f95a 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1163,47 +1163,31 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, in_counter = InCount + 1, persistent_count = PCount1 })}. -publish(msg, MsgStatus, #vqstate { - index_state = IndexState, ram_msg_count = RamMsgCount, - msg_store_clients = MSCState } = State) -> - {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(false, MsgStatus, MSCState), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(false, MsgStatus1, IndexState), - State1 = State #vqstate { ram_msg_count = RamMsgCount + 1, - index_state = IndexState1, - msg_store_clients = MSCState1 }, - store_alpha_entry(MsgStatus2, State1); - -publish(index, MsgStatus, #vqstate { - ram_index_count = RamIndexCount, msg_store_clients = MSCState, - index_state = IndexState, q1 = Q1 } = State) -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(true, MsgStatus, MSCState), +publish(msg, MsgStatus, State) -> + {MsgStatus1, State1 = #vqstate { ram_msg_count = RamMsgCount }} = + maybe_write_to_disk(false, false, MsgStatus, State), + State2 = State1 # vqstate {ram_msg_count = RamMsgCount + 1 }, + store_alpha_entry(MsgStatus1, State2); + +publish(index, MsgStatus, State) -> ForceIndex = should_force_index_to_disk(State), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), - IndexOnDisk = MsgStatus2 #msg_status.index_on_disk, + {MsgStatus1 = #msg_status { msg_on_disk = true, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { ram_index_count = RamIndexCount, q1 = Q1 }} = + maybe_write_to_disk(true, ForceIndex, MsgStatus, State), RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk), - State1 = State #vqstate { index_state = IndexState1, - ram_index_count = RamIndexCount1, - msg_store_clients = MSCState1 }, + State2 = State1 #vqstate { ram_index_count = RamIndexCount1 }, true = queue:is_empty(Q1), %% ASSERTION - store_beta_entry(MsgStatus2, State1); - -publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State = - #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2, - delta = Delta, msg_store_clients = MSCState }) -> - {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} = - maybe_write_msg_to_disk(true, MsgStatus, MSCState), - {#msg_status { index_on_disk = true }, IndexState1} = - maybe_write_index_to_disk(true, MsgStatus1, IndexState), + store_beta_entry(MsgStatus1, State2); + +publish(neither, MsgStatus, State) -> + {#msg_status { msg_on_disk = true, index_on_disk = true, seq_id = SeqId }, + State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} = + maybe_write_to_disk(true, true, MsgStatus, State), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION Delta1 = #delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 }, - State #vqstate { index_state = IndexState1, - delta = combine_deltas(Delta, Delta1), - msg_store_clients = MSCState1 }. + State1 #vqstate { delta = combine_deltas(Delta, Delta1) }. store_alpha_entry(MsgStatus, State = #vqstate { q1 = Q1, q2 = Q2, @@ -1285,6 +1269,16 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. +maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, + State = #vqstate { msg_store_clients = MSCState, + index_state = IndexState }) -> + {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( + ForceMsg, MsgStatus, MSCState), + {MsgStatus2, IndexState1} = maybe_write_index_to_disk( + ForceIndex, MsgStatus1, IndexState), + {MsgStatus2, State #vqstate { index_state = IndexState1, + msg_store_clients = MSCState1 }}. + %%---------------------------------------------------------------------------- %% Phase changes %%---------------------------------------------------------------------------- @@ -1418,26 +1412,21 @@ maybe_push_alphas_to_betas( target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; -maybe_push_alphas_to_betas( - Generator, Consumer, Q, State = - #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount, - index_state = IndexState, msg_store_clients = MSCState }) -> +maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> case Generator(Q) of {empty, _Q} -> State; {{value, MsgStatus}, Qa} -> - {MsgStatus1, MSCState1} = - maybe_write_msg_to_disk(true, MsgStatus, MSCState), ForceIndex = should_force_index_to_disk(State), - {MsgStatus2, IndexState1} = - maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState), - IndexOnDisk = MsgStatus2 #msg_status.index_on_disk, + {MsgStatus1 = #msg_status { msg_on_disk = true, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount }} = + maybe_write_to_disk(true, ForceIndex, MsgStatus, State), RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk), - State1 = State #vqstate { ram_msg_count = RamMsgCount - 1, - ram_index_count = RamIndexCount1, - index_state = IndexState1, - msg_store_clients = MSCState1 }, + State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1, + ram_index_count = RamIndexCount1 }, maybe_push_alphas_to_betas(Generator, Consumer, Qa, - Consumer(MsgStatus2, Qa, State1)) + Consumer(MsgStatus1, Qa, State2)) end. push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, |
