diff options
| author | Matthias Radestock <matthias@lshift.net> | 2010-06-03 22:16:23 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@lshift.net> | 2010-06-03 22:16:23 +0100 |
| commit | c344088af6adc0d7e7027e2e13afd4e70c57d180 (patch) | |
| tree | da704be0c7b802190f4b10288ea7c3df470a3502 /src | |
| parent | 2ef796e992cdaf6637a098eae0d011fab6ac51a1 (diff) | |
| download | rabbitmq-server-git-c344088af6adc0d7e7027e2e13afd4e70c57d180.tar.gz | |
cosmetic
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_variable_queue.erl | 386 |
1 files changed, 212 insertions, 174 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index c0aecd1b30..b2eb4dc4eb 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -235,8 +235,8 @@ -type(ack() :: seq_id() | 'blank_ack'). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), - count :: non_neg_integer (), - end_seq_id :: non_neg_integer() }). + count :: non_neg_integer (), + end_seq_id :: non_neg_integer() }). -type(state() :: #vqstate { q1 :: queue(), @@ -276,11 +276,11 @@ -endif. -define(BLANK_DELTA, #delta { start_seq_id = undefined, - count = 0, - end_seq_id = undefined }). + count = 0, + end_seq_id = undefined }). -define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z, - count = 0, - end_seq_id = Z }). + count = 0, + end_seq_id = Z }). %%---------------------------------------------------------------------------- %% Public API @@ -323,8 +323,8 @@ init(QueueName, IsDurable, _Recover) -> Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of true -> ?BLANK_DELTA; false -> #delta { start_seq_id = LowSeqId, - count = DeltaCount1, - end_seq_id = NextSeqId } + count = DeltaCount1, + end_seq_id = NextSeqId } end, Now = now(), PersistentClient = @@ -365,9 +365,10 @@ init(QueueName, IsDurable, _Recover) -> maybe_deltas_to_betas(State). terminate(State) -> - State1 = #vqstate { - persistent_count = PCount, index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } = + State1 = #vqstate { persistent_count = PCount, + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, + {MSCStateT, TRef}} } = remove_pending_ack(true, tx_commit_index(State)), case MSCStateP of undefined -> ok; @@ -377,17 +378,17 @@ terminate(State) -> Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], - State1 #vqstate { index_state = rabbit_queue_index:terminate( - Terms, IndexState), + State1 #vqstate { index_state = rabbit_queue_index:terminate( + Terms, IndexState), msg_store_clients = undefined }. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> {_PurgeCount, State1} = purge(State), - State2 = #vqstate { index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, - {MSCStateT, TRef}}, + State2 = #vqstate { index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, + {MSCStateT, TRef}}, transient_threshold = TransientThreshold } = remove_pending_ack(false, State1), %% flushing here is good because it deletes all full segments, @@ -409,16 +410,18 @@ delete_and_terminate(State) -> end, rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateT), - State2 #vqstate { index_state = IndexState5, + State2 #vqstate { index_state = IndexState5, msg_store_clients = undefined }. purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> {Q4Count, IndexState1} = remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState), {Len, State1} = - purge1(Q4Count, State #vqstate { q4 = queue:new(), + purge1(Q4Count, State #vqstate { q4 = queue:new(), index_state = IndexState1 }), - {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0, + {Len, State1 #vqstate { len = 0, + ram_msg_count = 0, + ram_index_count = 0, persistent_count = 0 }}. publish(Msg, State) -> @@ -430,15 +433,15 @@ publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, State}; publish_delivered(true, Msg = #basic_message { guid = Guid, is_persistent = IsPersistent }, - State = #vqstate { len = 0, - index_state = IndexState, - next_seq_id = SeqId, - out_counter = OutCount, - in_counter = InCount, + State = #vqstate { len = 0, + index_state = IndexState, msg_store_clients = MSCState, - persistent_count = PCount, - pending_ack = PA, - durable = IsDurable }) -> + next_seq_id = SeqId, + out_counter = OutCount, + in_counter = InCount, + persistent_count = PCount, + pending_ack = PA, + durable = IsDurable }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1, @@ -447,10 +450,10 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, maybe_write_msg_to_disk(false, MsgStatus, MSCState), PCount1 = maybe_inc(PCount, IsPersistent1), State1 = State #vqstate { msg_store_clients = MSCState1, - persistent_count = PCount1, - next_seq_id = SeqId + 1, - out_counter = OutCount + 1, - in_counter = InCount + 1 }, + persistent_count = PCount1, + next_seq_id = SeqId + 1, + out_counter = OutCount + 1, + in_counter = InCount + 1 }, {SeqId, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = @@ -462,10 +465,13 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, State1 #vqstate { pending_ack = PA1 } end}. -fetch(AckRequired, State = - #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount, - index_state = IndexState, len = Len, persistent_count = PCount, - pending_ack = PA }) -> +fetch(AckRequired, State = #vqstate { q4 = Q4, + ram_msg_count = RamMsgCount, + out_counter = OutCount, + index_state = IndexState, + len = Len, + persistent_count = PCount, + pending_ack = PA }) -> case queue:out(Q4) of {empty, _Q4} -> case fetch_from_q3_or_delta(State) of @@ -531,17 +537,20 @@ fetch(AckRequired, State = PCount1 = maybe_dec(PCount, IsPersistent andalso not AckRequired), Len1 = Len - 1, {{Msg, IsDelivered, AckTag, Len1}, - State #vqstate { q4 = Q4a, out_counter = OutCount + 1, - ram_msg_count = RamMsgCount - 1, - index_state = IndexState3, len = Len1, - pending_ack = PA1, persistent_count = PCount1 }} + State #vqstate { q4 = Q4a, + ram_msg_count = RamMsgCount - 1, + out_counter = OutCount + 1, + index_state = IndexState3, + len = Len1, + persistent_count = PCount1, + pending_ack = PA1 }} end. ack([], State) -> State; -ack(AckTags, State = #vqstate { index_state = IndexState, +ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, - pending_ack = PA }) -> + pending_ack = PA }) -> {GuidsByStore, SeqIds, PA1} = lists:foldl( fun (SeqId, {Dict, SeqIds, PAN}) -> @@ -568,13 +577,13 @@ ack(AckTags, State = #vqstate { index_state = IndexState, error -> 0; {ok, Guids} -> length(Guids) end, - State #vqstate { index_state = IndexState1, persistent_count = PCount1, - pending_ack = PA1 }. + State #vqstate { index_state = IndexState1, + persistent_count = PCount1, + pending_ack = PA1 }. tx_publish(Txn, Msg = #basic_message { is_persistent = true, guid = Guid }, - State = #vqstate { msg_store_clients = MSCState, - durable = true }) -> + State = #vqstate { msg_store_clients = MSCState, durable = true }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, @@ -622,12 +631,12 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) -> requeue(AckTags, State) -> {SeqIds, GuidsByStore, - State1 = #vqstate { index_state = IndexState, + State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun (SeqId, {SeqIdsAcc, Dict, StateN = - #vqstate { msg_store_clients = MSCStateN, - pending_ack = PAN }}) -> + fun (SeqId, {SeqIdsAcc, Dict, StateN = #vqstate { + msg_store_clients = MSCStateN, + pending_ack = PAN }}) -> PAN1 = dict:erase(SeqId, PAN), StateN1 = StateN #vqstate { pending_ack = PAN1 }, case dict:find(SeqId, PAN) of @@ -662,7 +671,7 @@ requeue(AckTags, State) -> error -> 0; {ok, Guids} -> length(Guids) end, - State1 #vqstate { index_state = IndexState1, + State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. len(#vqstate { len = Len }) -> @@ -671,11 +680,11 @@ len(#vqstate { len = Len }) -> is_empty(State) -> 0 == len(State). -set_ram_duration_target( - DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, - target_ram_msg_count = TargetRamMsgCount - }) -> +set_ram_duration_target(DurationTarget, + State = #vqstate { + avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate, + target_ram_msg_count = TargetRamMsgCount }) -> Rate = AvgEgressRate + AvgIngressRate, TargetRamMsgCount1 = case DurationTarget of @@ -684,23 +693,23 @@ set_ram_duration_target( _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec end, State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1, - duration_target = DurationTarget }, + duration_target = DurationTarget }, case TargetRamMsgCount1 == undefined orelse TargetRamMsgCount1 >= TargetRamMsgCount of true -> State1; false -> reduce_memory_use(State1) end. -ram_duration(State = #vqstate { egress_rate = Egress, - ingress_rate = Ingress, - rate_timestamp = Timestamp, - in_counter = InCount, - out_counter = OutCount, - ram_msg_count = RamMsgCount, - duration_target = DurationTarget, +ram_duration(State = #vqstate { egress_rate = Egress, + ingress_rate = Ingress, + rate_timestamp = Timestamp, + in_counter = InCount, + out_counter = OutCount, + ram_msg_count = RamMsgCount, + duration_target = DurationTarget, ram_msg_count_prev = RamMsgCountPrev }) -> Now = now(), - {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), + {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress), {AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress), Duration = %% msgs / (msgs/sec) == sec @@ -710,15 +719,16 @@ ram_duration(State = #vqstate { egress_rate = Egress, (2 * (AvgEgressRate + AvgIngressRate)) end, - {Duration, set_ram_duration_target( - DurationTarget, - State #vqstate { egress_rate = Egress1, - avg_egress_rate = AvgEgressRate, - ingress_rate = Ingress1, - avg_ingress_rate = AvgIngressRate, - rate_timestamp = Now, - ram_msg_count_prev = RamMsgCount, - out_counter = 0, in_counter = 0 })}. + {Duration, set_ram_duration_target(DurationTarget, + State #vqstate { + egress_rate = Egress1, + avg_egress_rate = AvgEgressRate, + ingress_rate = Ingress1, + avg_ingress_rate = AvgIngressRate, + rate_timestamp = Now, + in_counter = 0, + out_counter = 0, + ram_msg_count_prev = RamMsgCount })}. needs_sync(#vqstate { on_sync = {_, _, []} }) -> false; needs_sync(_) -> true. @@ -729,26 +739,27 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) -> State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }. status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, - len = Len, on_sync = {_, _, From}, + len = Len, + on_sync = {_, _, From}, target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, - avg_egress_rate = AvgEgressRate, - avg_ingress_rate = AvgIngressRate, - next_seq_id = NextSeqId }) -> - [ {q1, queue:len(Q1)}, - {q2, bpqueue:len(Q2)}, - {delta, Delta}, - {q3, bpqueue:len(Q3)}, - {q4, queue:len(Q4)}, - {len, Len}, - {outstanding_txns, length(From)}, - {target_ram_msg_count, TargetRamMsgCount}, - {ram_msg_count, RamMsgCount}, - {ram_index_count, RamIndexCount}, - {avg_egress_rate, AvgEgressRate}, - {avg_ingress_rate, AvgIngressRate}, - {next_seq_id, NextSeqId} ]. + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, + avg_egress_rate = AvgEgressRate, + avg_ingress_rate = AvgIngressRate, + next_seq_id = NextSeqId }) -> + [ {q1 , queue:len(Q1)}, + {q2 , bpqueue:len(Q2)}, + {delta , Delta}, + {q3 , bpqueue:len(Q3)}, + {q4 , queue:len(Q4)}, + {len , Len}, + {outstanding_txns , length(From)}, + {target_ram_msg_count , TargetRamMsgCount}, + {ram_msg_count , RamMsgCount}, + {ram_index_count , RamIndexCount}, + {avg_egress_rate , AvgEgressRate}, + {avg_ingress_rate , AvgIngressRate}, + {next_seq_id , NextSeqId} ]. %%---------------------------------------------------------------------------- %% Minor helpers @@ -872,19 +883,22 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) -> %% the first arg is the older delta combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) -> ?BLANK_DELTA; -combine_deltas(?BLANK_DELTA_PATTERN(X), - #delta { start_seq_id = Start, count = Count, - end_seq_id = End } = B) -> +combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start, + count = Count, + end_seq_id = End } = B) -> true = Start + Count =< End, %% ASSERTION B; -combine_deltas(#delta { start_seq_id = Start, count = Count, - end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> +combine_deltas(#delta { start_seq_id = Start, + count = Count, + end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) -> true = Start + Count =< End, %% ASSERTION A; -combine_deltas(#delta { start_seq_id = StartLow, count = CountLow, - end_seq_id = EndLow }, - #delta { start_seq_id = StartHigh, count = CountHigh, - end_seq_id = EndHigh }) -> +combine_deltas(#delta { start_seq_id = StartLow, + count = CountLow, + end_seq_id = EndLow }, + #delta { start_seq_id = StartHigh, + count = CountHigh, + end_seq_id = EndHigh }) -> Count = CountLow + CountHigh, true = (StartLow =< StartHigh) %% ASSERTIONS andalso ((StartLow + CountLow) =< EndLow) @@ -897,7 +911,9 @@ beta_fold_no_index_on_disk(Fun, Init, Q) -> permitted_ram_index_count(#vqstate { len = 0 }) -> undefined; -permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3, +permitted_ram_index_count(#vqstate { len = Len, + q2 = Q2, + q3 = Q3, delta = #delta { count = DeltaCount } }) -> AlphaBetaLen = Len - DeltaCount, case AlphaBetaLen == 0 of @@ -936,9 +952,11 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) -> end) end. -tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = - #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns}, - pending_ack = PA, durable = IsDurable }) -> +tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, + State = #vqstate { + on_sync = OnSync = {SAcks, SPubs, SFuns}, + pending_ack = PA, + durable = IsDurable }) -> %% If we are a non-durable queue, or (no persisent pubs, and no %% persistent acks) then we can skip the queue_index loop. case (not IsDurable) orelse @@ -946,7 +964,7 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State = lists:foldl( fun (AckTag, true ) -> case dict:find(AckTag, PA) of - {ok, #msg_status{}} -> true; + {ok, #msg_status {}} -> true; {ok, {IsPersistent, _Guid}} -> not IsPersistent end; (_AckTag, false) -> false @@ -1006,15 +1024,15 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) -> remove_queue_entries(fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState), {Count + Q1Count, - State #vqstate { q1 = queue:new(), + State #vqstate { q1 = queue:new(), index_state = IndexState1 }}; false -> {Q3Count, IndexState1} = remove_queue_entries(fun beta_fold_no_index_on_disk/3, Q3, IndexState), purge1(Count + Q3Count, maybe_deltas_to_betas( - State #vqstate { index_state = IndexState1, - q3 = bpqueue:new() })) + State #vqstate { q3 = bpqueue:new(), + index_state = IndexState1 })) end. remove_queue_entries(Fold, Q, IndexState) -> @@ -1052,10 +1070,13 @@ remove_queue_entries1( {CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { - q1 = Q1, q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount, - ram_index_count = RamIndexCount, + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_msg_count = RamMsgCount, + ram_index_count = RamIndexCount, msg_store_clients = MSCState }) -> case bpqueue:out(Q3) of {empty, _Q3} -> @@ -1072,9 +1093,10 @@ fetch_from_q3_or_delta(State = #vqstate { Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = maybe_dec(RamIndexCount, not IndexOnDisk), true = RamIndexCount1 >= 0, %% ASSERTION - State1 = State #vqstate { q3 = Q3a, q4 = Q4a, - ram_msg_count = RamMsgCount + 1, - ram_index_count = RamIndexCount1, + State1 = State #vqstate { q3 = Q3a, + q4 = Q4a, + ram_msg_count = RamMsgCount + 1, + ram_index_count = RamIndexCount1, msg_store_clients = MSCState1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of @@ -1096,12 +1118,13 @@ fetch_from_q3_or_delta(State = #vqstate { {loaded, State2} end. -reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) +reduce_memory_use(State = #vqstate { + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; -reduce_memory_use(State = - #vqstate { target_ram_msg_count = TargetRamMsgCount }) -> +reduce_memory_use(State = #vqstate { + target_ram_msg_count = TargetRamMsgCount }) -> State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)), case TargetRamMsgCount of 0 -> push_betas_to_deltas(State1); @@ -1113,8 +1136,9 @@ reduce_memory_use(State = %%---------------------------------------------------------------------------- test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount, - q1 = Q1, q3 = Q3 }) -> + ram_msg_count = RamMsgCount, + q1 = Q1, + q3 = Q3 }) -> case TargetRamMsgCount of undefined -> msg; @@ -1150,9 +1174,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, end. publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, - IsDelivered, MsgOnDisk, State = - #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, - persistent_count = PCount, durable = IsDurable }) -> + IsDelivered, MsgOnDisk, + State = #vqstate { next_seq_id = SeqId, + len = Len, + in_counter = InCount, + persistent_count = PCount, + durable = IsDurable }) -> MsgStatus = #msg_status { msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsDurable andalso IsPersistent, @@ -1160,8 +1187,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, index_on_disk = false }, PCount1 = maybe_inc(PCount, IsPersistent), {SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus, - State #vqstate { next_seq_id = SeqId + 1, len = Len + 1, - in_counter = InCount + 1, + State #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, persistent_count = PCount1 })}. publish(msg, MsgStatus, State) -> @@ -1186,14 +1214,17 @@ publish(neither, MsgStatus, State) -> State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} = maybe_write_to_disk(true, true, MsgStatus, State), true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION - Delta1 = #delta { start_seq_id = SeqId, count = 1, - end_seq_id = SeqId + 1 }, + Delta1 = #delta { start_seq_id = SeqId, + count = 1, + end_seq_id = SeqId + 1 }, State1 #vqstate { delta = combine_deltas(Delta, Delta1) }. -store_alpha_entry(MsgStatus, State = - #vqstate { q1 = Q1, q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, q4 = Q4 }) -> +store_alpha_entry(MsgStatus, State = #vqstate { + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4 }) -> case bpqueue:is_empty(Q2) andalso 0 == DeltaCount andalso bpqueue:is_empty(Q3) of true -> true = queue:is_empty(Q1), %% ASSERTION @@ -1204,9 +1235,9 @@ store_alpha_entry(MsgStatus, State = store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, - State = #vqstate { q2 = Q2, + State = #vqstate { q2 = Q2, delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> + q3 = Q3 }) -> MsgStatus1 = MsgStatus #msg_status { msg = undefined }, case DeltaCount == 0 of true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, @@ -1271,13 +1302,13 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) -> {MsgStatus, IndexState}. maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, - State = #vqstate { msg_store_clients = MSCState, - index_state = IndexState }) -> + State = #vqstate { index_state = IndexState, + msg_store_clients = MSCState }) -> {MsgStatus1, MSCState1} = maybe_write_msg_to_disk( ForceMsg, MsgStatus, MSCState), {MsgStatus2, IndexState1} = maybe_write_index_to_disk( ForceIndex, MsgStatus1, IndexState), - {MsgStatus2, State #vqstate { index_state = IndexState1, + {MsgStatus2, State #vqstate { index_state = IndexState1, msg_store_clients = MSCState1 }}. %%---------------------------------------------------------------------------- @@ -1321,9 +1352,9 @@ limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 }) limit_q3_ram_index(Reduction, State) -> {Reduction, State}. -limit_ram_index(MapFoldFilterFun, Q, Reduction, State = - #vqstate { ram_index_count = RamIndexCount, - index_state = IndexState }) -> +limit_ram_index(MapFoldFilterFun, Q, Reduction, + State = #vqstate { ram_index_count = RamIndexCount, + index_state = IndexState }) -> {Qa, {Reduction1, IndexState1}} = MapFoldFilterFun( fun erlang:'not'/1, @@ -1337,18 +1368,18 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State = {true, MsgStatus1, {N-1, IndexStateN1}} end, {Reduction, IndexState}, Q), RamIndexCount1 = RamIndexCount - (Reduction - Reduction1), - {Qa, Reduction1, State #vqstate { index_state = IndexState1, + {Qa, Reduction1, State #vqstate { index_state = IndexState1, ram_index_count = RamIndexCount1 }}. maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> State; -maybe_deltas_to_betas( - State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3, - target_ram_msg_count = TargetRamMsgCount, - delta = Delta = #delta { start_seq_id = DeltaSeqId, - count = DeltaCount, - end_seq_id = DeltaSeqIdEnd }, - transient_threshold = TransientThreshold}) -> +maybe_deltas_to_betas(State = #vqstate { + q2 = Q2, + delta = Delta, + q3 = Q3, + index_state = IndexState, + target_ram_msg_count = TargetRamMsgCount, + transient_threshold = TransientThreshold }) -> case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of true -> State; @@ -1356,6 +1387,9 @@ maybe_deltas_to_betas( %% either q3 is empty, in which case we load at least one %% segment, or TargetRamMsgCount > 0, meaning we should %% really be holding all the betas in memory. + #delta { start_seq_id = DeltaSeqId, + count = DeltaCount, + end_seq_id = DeltaSeqIdEnd } = Delta, {List, IndexState1, Delta1SeqId} = read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState), %% length(List) may be < segment_size because of acks. It @@ -1375,15 +1409,15 @@ maybe_deltas_to_betas( 0 -> %% delta is now empty, but it wasn't %% before, so can now join q2 onto q3 - State1 #vqstate { delta = ?BLANK_DELTA, - q2 = bpqueue:new(), - q3 = bpqueue:join(Q3b, Q2) }; + State1 #vqstate { q2 = bpqueue:new(), + delta = ?BLANK_DELTA, + q3 = bpqueue:join(Q3b, Q2) }; N when N > 0 -> - State1 #vqstate { - q3 = Q3b, - delta = #delta { start_seq_id = Delta1SeqId, - count = N, - end_seq_id = DeltaSeqIdEnd } } + Delta1 = #delta { start_seq_id = Delta1SeqId, + count = N, + end_seq_id = DeltaSeqIdEnd }, + State1 #vqstate { delta = Delta1, + q3 = Q3b } end end end. @@ -1407,10 +1441,10 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> q4 = Q4a } end, Q4, State). -maybe_push_alphas_to_betas( - _Generator, _Consumer, _Q, - State = #vqstate { ram_msg_count = RamMsgCount, - target_ram_msg_count = TargetRamMsgCount }) +maybe_push_alphas_to_betas(_Generator, _Consumer, _Q, + State = #vqstate { + ram_msg_count = RamMsgCount, + target_ram_msg_count = TargetRamMsgCount }) when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount -> State; maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> @@ -1420,7 +1454,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> ForceIndex = should_force_index_to_disk(State), {MsgStatus1 = #msg_status { msg_on_disk = true, index_on_disk = IndexOnDisk }, - State1 = #vqstate { ram_msg_count = RamMsgCount, + State1 = #vqstate { ram_msg_count = RamMsgCount, ram_index_count = RamIndexCount }} = maybe_write_to_disk(true, ForceIndex, MsgStatus, State), RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk), @@ -1430,9 +1464,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) -> Consumer(MsgStatus1, Qa, State2)) end. -push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, +push_betas_to_deltas(State = #vqstate { q2 = Q2, + delta = Delta, + q3 = Q3, ram_index_count = RamIndexCount, - index_state = IndexState }) -> + index_state = IndexState }) -> %% HighSeqId is high in the sense that it must be higher than the %% seq_id in Delta, but it's also the lowest of the betas that we %% transfer from q2 to delta. @@ -1449,10 +1485,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, end, Delta1 = #delta { start_seq_id = Delta1SeqId } = combine_deltas(Delta, #delta { start_seq_id = HighSeqId, - count = Len1, - end_seq_id = EndSeqId }), - State1 = State #vqstate { q2 = bpqueue:new(), delta = Delta1, - index_state = IndexState1, + count = Len1, + end_seq_id = EndSeqId }), + State1 = State #vqstate { q2 = bpqueue:new(), + delta = Delta1, + index_state = IndexState1, ram_index_count = RamIndexCount1 }, case bpqueue:out(Q3) of {empty, _Q3} -> @@ -1479,12 +1516,13 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, {SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} = push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3, RamIndexCount1, IndexState1), - Delta2 = combine_deltas(#delta { start_seq_id = Limit, - count = Len2, - end_seq_id = SeqIdMax+1 }, - Delta1), - State1 #vqstate { q3 = Q3a, delta = Delta2, - index_state = IndexState2, + Delta2 = #delta { start_seq_id = Limit, + count = Len2, + end_seq_id = SeqIdMax + 1 }, + Delta3 = combine_deltas(Delta2, Delta1), + State1 #vqstate { delta = Delta3, + q3 = Q3a, + index_state = IndexState2, ram_index_count = RamIndexCount2 } end end. |
