summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-06-03 14:07:28 +0100
committerMatthias Radestock <matthias@lshift.net>2010-06-03 14:07:28 +0100
commitd4c5750f8663fc4fb899ecd108226c3d735487f1 (patch)
treee7308baa91d0381ee21892be465e784d28597274
parent043a2932acbc8e4bd7554a8a6f2209c81779b163 (diff)
downloadrabbitmq-server-git-d4c5750f8663fc4fb899ecd108226c3d735487f1.tar.gz
refactor: introduce helper fun
-rw-r--r--src/rabbit_variable_queue.erl87
1 files changed, 38 insertions, 49 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 3e66e000b2..81f346f95a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1163,47 +1163,31 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
in_counter = InCount + 1,
persistent_count = PCount1 })}.
-publish(msg, MsgStatus, #vqstate {
- index_state = IndexState, ram_msg_count = RamMsgCount,
- msg_store_clients = MSCState } = State) ->
- {MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(false, MsgStatus, MSCState),
- {MsgStatus2, IndexState1} =
- maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- State1 = State #vqstate { ram_msg_count = RamMsgCount + 1,
- index_state = IndexState1,
- msg_store_clients = MSCState1 },
- store_alpha_entry(MsgStatus2, State1);
-
-publish(index, MsgStatus, #vqstate {
- ram_index_count = RamIndexCount, msg_store_clients = MSCState,
- index_state = IndexState, q1 = Q1 } = State) ->
- {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(true, MsgStatus, MSCState),
+publish(msg, MsgStatus, State) ->
+ {MsgStatus1, State1 = #vqstate { ram_msg_count = RamMsgCount }} =
+ maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = State1 # vqstate {ram_msg_count = RamMsgCount + 1 },
+ store_alpha_entry(MsgStatus1, State2);
+
+publish(index, MsgStatus, State) ->
ForceIndex = should_force_index_to_disk(State),
- {MsgStatus2, IndexState1} =
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
- IndexOnDisk = MsgStatus2 #msg_status.index_on_disk,
+ {MsgStatus1 = #msg_status { msg_on_disk = true,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { ram_index_count = RamIndexCount, q1 = Q1 }} =
+ maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk),
- State1 = State #vqstate { index_state = IndexState1,
- ram_index_count = RamIndexCount1,
- msg_store_clients = MSCState1 },
+ State2 = State1 #vqstate { ram_index_count = RamIndexCount1 },
true = queue:is_empty(Q1), %% ASSERTION
- store_beta_entry(MsgStatus2, State1);
-
-publish(neither, MsgStatus = #msg_status { seq_id = SeqId }, State =
- #vqstate { index_state = IndexState, q1 = Q1, q2 = Q2,
- delta = Delta, msg_store_clients = MSCState }) ->
- {MsgStatus1 = #msg_status { msg_on_disk = true }, MSCState1} =
- maybe_write_msg_to_disk(true, MsgStatus, MSCState),
- {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(true, MsgStatus1, IndexState),
+ store_beta_entry(MsgStatus1, State2);
+
+publish(neither, MsgStatus, State) ->
+ {#msg_status { msg_on_disk = true, index_on_disk = true, seq_id = SeqId },
+ State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} =
+ maybe_write_to_disk(true, true, MsgStatus, State),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
Delta1 = #delta { start_seq_id = SeqId, count = 1,
end_seq_id = SeqId + 1 },
- State #vqstate { index_state = IndexState1,
- delta = combine_deltas(Delta, Delta1),
- msg_store_clients = MSCState1 }.
+ State1 #vqstate { delta = combine_deltas(Delta, Delta1) }.
store_alpha_entry(MsgStatus, State =
#vqstate { q1 = Q1, q2 = Q2,
@@ -1285,6 +1269,16 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
+ State = #vqstate { msg_store_clients = MSCState,
+ index_state = IndexState }) ->
+ {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
+ ForceMsg, MsgStatus, MSCState),
+ {MsgStatus2, IndexState1} = maybe_write_index_to_disk(
+ ForceIndex, MsgStatus1, IndexState),
+ {MsgStatus2, State #vqstate { index_state = IndexState1,
+ msg_store_clients = MSCState1 }}.
+
%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1418,26 +1412,21 @@ maybe_push_alphas_to_betas(
target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
-maybe_push_alphas_to_betas(
- Generator, Consumer, Q, State =
- #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount,
- index_state = IndexState, msg_store_clients = MSCState }) ->
+maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
case Generator(Q) of
{empty, _Q} -> State;
{{value, MsgStatus}, Qa} ->
- {MsgStatus1, MSCState1} =
- maybe_write_msg_to_disk(true, MsgStatus, MSCState),
ForceIndex = should_force_index_to_disk(State),
- {MsgStatus2, IndexState1} =
- maybe_write_index_to_disk(ForceIndex, MsgStatus1, IndexState),
- IndexOnDisk = MsgStatus2 #msg_status.index_on_disk,
+ {MsgStatus1 = #msg_status { msg_on_disk = true,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount }} =
+ maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk),
- State1 = State #vqstate { ram_msg_count = RamMsgCount - 1,
- ram_index_count = RamIndexCount1,
- index_state = IndexState1,
- msg_store_clients = MSCState1 },
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
+ ram_index_count = RamIndexCount1 },
maybe_push_alphas_to_betas(Generator, Consumer, Qa,
- Consumer(MsgStatus2, Qa, State1))
+ Consumer(MsgStatus1, Qa, State2))
end.
push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,