summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-06-03 22:16:23 +0100
committerMatthias Radestock <matthias@lshift.net>2010-06-03 22:16:23 +0100
commitc344088af6adc0d7e7027e2e13afd4e70c57d180 (patch)
treeda704be0c7b802190f4b10288ea7c3df470a3502 /src
parent2ef796e992cdaf6637a098eae0d011fab6ac51a1 (diff)
downloadrabbitmq-server-git-c344088af6adc0d7e7027e2e13afd4e70c57d180.tar.gz
cosmetic
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl386
1 files changed, 212 insertions, 174 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index c0aecd1b30..b2eb4dc4eb 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -235,8 +235,8 @@
-type(ack() :: seq_id() | 'blank_ack').
-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
- count :: non_neg_integer (),
- end_seq_id :: non_neg_integer() }).
+ count :: non_neg_integer (),
+ end_seq_id :: non_neg_integer() }).
-type(state() :: #vqstate {
q1 :: queue(),
@@ -276,11 +276,11 @@
-endif.
-define(BLANK_DELTA, #delta { start_seq_id = undefined,
- count = 0,
- end_seq_id = undefined }).
+ count = 0,
+ end_seq_id = undefined }).
-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
- count = 0,
- end_seq_id = Z }).
+ count = 0,
+ end_seq_id = Z }).
%%----------------------------------------------------------------------------
%% Public API
@@ -323,8 +323,8 @@ init(QueueName, IsDurable, _Recover) ->
Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
true -> ?BLANK_DELTA;
false -> #delta { start_seq_id = LowSeqId,
- count = DeltaCount1,
- end_seq_id = NextSeqId }
+ count = DeltaCount1,
+ end_seq_id = NextSeqId }
end,
Now = now(),
PersistentClient =
@@ -365,9 +365,10 @@ init(QueueName, IsDurable, _Recover) ->
maybe_deltas_to_betas(State).
terminate(State) ->
- State1 = #vqstate {
- persistent_count = PCount, index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } =
+ State1 = #vqstate { persistent_count = PCount,
+ index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef},
+ {MSCStateT, TRef}} } =
remove_pending_ack(true, tx_commit_index(State)),
case MSCStateP of
undefined -> ok;
@@ -377,17 +378,17 @@ terminate(State) ->
Terms = [{persistent_ref, PRef},
{transient_ref, TRef},
{persistent_count, PCount}],
- State1 #vqstate { index_state = rabbit_queue_index:terminate(
- Terms, IndexState),
+ State1 #vqstate { index_state = rabbit_queue_index:terminate(
+ Terms, IndexState),
msg_store_clients = undefined }.
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(State) ->
{_PurgeCount, State1} = purge(State),
- State2 = #vqstate { index_state = IndexState,
- msg_store_clients = {{MSCStateP, PRef},
- {MSCStateT, TRef}},
+ State2 = #vqstate { index_state = IndexState,
+ msg_store_clients = {{MSCStateP, PRef},
+ {MSCStateT, TRef}},
transient_threshold = TransientThreshold } =
remove_pending_ack(false, State1),
%% flushing here is good because it deletes all full segments,
@@ -409,16 +410,18 @@ delete_and_terminate(State) ->
end,
rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef),
rabbit_msg_store:client_terminate(MSCStateT),
- State2 #vqstate { index_state = IndexState5,
+ State2 #vqstate { index_state = IndexState5,
msg_store_clients = undefined }.
purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
{Q4Count, IndexState1} =
remove_queue_entries(fun rabbit_misc:queue_fold/3, Q4, IndexState),
{Len, State1} =
- purge1(Q4Count, State #vqstate { q4 = queue:new(),
+ purge1(Q4Count, State #vqstate { q4 = queue:new(),
index_state = IndexState1 }),
- {Len, State1 #vqstate { len = 0, ram_msg_count = 0, ram_index_count = 0,
+ {Len, State1 #vqstate { len = 0,
+ ram_msg_count = 0,
+ ram_index_count = 0,
persistent_count = 0 }}.
publish(Msg, State) ->
@@ -430,15 +433,15 @@ publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, State};
publish_delivered(true, Msg = #basic_message { guid = Guid,
is_persistent = IsPersistent },
- State = #vqstate { len = 0,
- index_state = IndexState,
- next_seq_id = SeqId,
- out_counter = OutCount,
- in_counter = InCount,
+ State = #vqstate { len = 0,
+ index_state = IndexState,
msg_store_clients = MSCState,
- persistent_count = PCount,
- pending_ack = PA,
- durable = IsDurable }) ->
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ persistent_count = PCount,
+ pending_ack = PA,
+ durable = IsDurable }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent1,
@@ -447,10 +450,10 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
maybe_write_msg_to_disk(false, MsgStatus, MSCState),
PCount1 = maybe_inc(PCount, IsPersistent1),
State1 = State #vqstate { msg_store_clients = MSCState1,
- persistent_count = PCount1,
- next_seq_id = SeqId + 1,
- out_counter = OutCount + 1,
- in_counter = InCount + 1 },
+ persistent_count = PCount1,
+ next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1 },
{SeqId,
case MsgStatus1 #msg_status.msg_on_disk of
true -> {#msg_status { index_on_disk = true }, IndexState1} =
@@ -462,10 +465,13 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
State1 #vqstate { pending_ack = PA1 }
end}.
-fetch(AckRequired, State =
- #vqstate { q4 = Q4, ram_msg_count = RamMsgCount, out_counter = OutCount,
- index_state = IndexState, len = Len, persistent_count = PCount,
- pending_ack = PA }) ->
+fetch(AckRequired, State = #vqstate { q4 = Q4,
+ ram_msg_count = RamMsgCount,
+ out_counter = OutCount,
+ index_state = IndexState,
+ len = Len,
+ persistent_count = PCount,
+ pending_ack = PA }) ->
case queue:out(Q4) of
{empty, _Q4} ->
case fetch_from_q3_or_delta(State) of
@@ -531,17 +537,20 @@ fetch(AckRequired, State =
PCount1 = maybe_dec(PCount, IsPersistent andalso not AckRequired),
Len1 = Len - 1,
{{Msg, IsDelivered, AckTag, Len1},
- State #vqstate { q4 = Q4a, out_counter = OutCount + 1,
- ram_msg_count = RamMsgCount - 1,
- index_state = IndexState3, len = Len1,
- pending_ack = PA1, persistent_count = PCount1 }}
+ State #vqstate { q4 = Q4a,
+ ram_msg_count = RamMsgCount - 1,
+ out_counter = OutCount + 1,
+ index_state = IndexState3,
+ len = Len1,
+ persistent_count = PCount1,
+ pending_ack = PA1 }}
end.
ack([], State) ->
State;
-ack(AckTags, State = #vqstate { index_state = IndexState,
+ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
- pending_ack = PA }) ->
+ pending_ack = PA }) ->
{GuidsByStore, SeqIds, PA1} =
lists:foldl(
fun (SeqId, {Dict, SeqIds, PAN}) ->
@@ -568,13 +577,13 @@ ack(AckTags, State = #vqstate { index_state = IndexState,
error -> 0;
{ok, Guids} -> length(Guids)
end,
- State #vqstate { index_state = IndexState1, persistent_count = PCount1,
- pending_ack = PA1 }.
+ State #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ pending_ack = PA1 }.
tx_publish(Txn,
Msg = #basic_message { is_persistent = true, guid = Guid },
- State = #vqstate { msg_store_clients = MSCState,
- durable = true }) ->
+ State = #vqstate { msg_store_clients = MSCState, durable = true }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
@@ -622,12 +631,12 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
requeue(AckTags, State) ->
{SeqIds, GuidsByStore,
- State1 = #vqstate { index_state = IndexState,
+ State1 = #vqstate { index_state = IndexState,
persistent_count = PCount }} =
lists:foldl(
- fun (SeqId, {SeqIdsAcc, Dict, StateN =
- #vqstate { msg_store_clients = MSCStateN,
- pending_ack = PAN }}) ->
+ fun (SeqId, {SeqIdsAcc, Dict, StateN = #vqstate {
+ msg_store_clients = MSCStateN,
+ pending_ack = PAN }}) ->
PAN1 = dict:erase(SeqId, PAN),
StateN1 = StateN #vqstate { pending_ack = PAN1 },
case dict:find(SeqId, PAN) of
@@ -662,7 +671,7 @@ requeue(AckTags, State) ->
error -> 0;
{ok, Guids} -> length(Guids)
end,
- State1 #vqstate { index_state = IndexState1,
+ State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
len(#vqstate { len = Len }) ->
@@ -671,11 +680,11 @@ len(#vqstate { len = Len }) ->
is_empty(State) ->
0 == len(State).
-set_ram_duration_target(
- DurationTarget, State = #vqstate { avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate,
- target_ram_msg_count = TargetRamMsgCount
- }) ->
+set_ram_duration_target(DurationTarget,
+ State = #vqstate {
+ avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate,
+ target_ram_msg_count = TargetRamMsgCount }) ->
Rate = AvgEgressRate + AvgIngressRate,
TargetRamMsgCount1 =
case DurationTarget of
@@ -684,23 +693,23 @@ set_ram_duration_target(
_ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
- duration_target = DurationTarget },
+ duration_target = DurationTarget },
case TargetRamMsgCount1 == undefined orelse
TargetRamMsgCount1 >= TargetRamMsgCount of
true -> State1;
false -> reduce_memory_use(State1)
end.
-ram_duration(State = #vqstate { egress_rate = Egress,
- ingress_rate = Ingress,
- rate_timestamp = Timestamp,
- in_counter = InCount,
- out_counter = OutCount,
- ram_msg_count = RamMsgCount,
- duration_target = DurationTarget,
+ram_duration(State = #vqstate { egress_rate = Egress,
+ ingress_rate = Ingress,
+ rate_timestamp = Timestamp,
+ in_counter = InCount,
+ out_counter = OutCount,
+ ram_msg_count = RamMsgCount,
+ duration_target = DurationTarget,
ram_msg_count_prev = RamMsgCountPrev }) ->
Now = now(),
- {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
+ {AvgEgressRate, Egress1} = update_rate(Now, Timestamp, OutCount, Egress),
{AvgIngressRate, Ingress1} = update_rate(Now, Timestamp, InCount, Ingress),
Duration = %% msgs / (msgs/sec) == sec
@@ -710,15 +719,16 @@ ram_duration(State = #vqstate { egress_rate = Egress,
(2 * (AvgEgressRate + AvgIngressRate))
end,
- {Duration, set_ram_duration_target(
- DurationTarget,
- State #vqstate { egress_rate = Egress1,
- avg_egress_rate = AvgEgressRate,
- ingress_rate = Ingress1,
- avg_ingress_rate = AvgIngressRate,
- rate_timestamp = Now,
- ram_msg_count_prev = RamMsgCount,
- out_counter = 0, in_counter = 0 })}.
+ {Duration, set_ram_duration_target(DurationTarget,
+ State #vqstate {
+ egress_rate = Egress1,
+ avg_egress_rate = AvgEgressRate,
+ ingress_rate = Ingress1,
+ avg_ingress_rate = AvgIngressRate,
+ rate_timestamp = Now,
+ in_counter = 0,
+ out_counter = 0,
+ ram_msg_count_prev = RamMsgCount })}.
needs_sync(#vqstate { on_sync = {_, _, []} }) -> false;
needs_sync(_) -> true.
@@ -729,26 +739,27 @@ handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
- len = Len, on_sync = {_, _, From},
+ len = Len,
+ on_sync = {_, _, From},
target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
- avg_egress_rate = AvgEgressRate,
- avg_ingress_rate = AvgIngressRate,
- next_seq_id = NextSeqId }) ->
- [ {q1, queue:len(Q1)},
- {q2, bpqueue:len(Q2)},
- {delta, Delta},
- {q3, bpqueue:len(Q3)},
- {q4, queue:len(Q4)},
- {len, Len},
- {outstanding_txns, length(From)},
- {target_ram_msg_count, TargetRamMsgCount},
- {ram_msg_count, RamMsgCount},
- {ram_index_count, RamIndexCount},
- {avg_egress_rate, AvgEgressRate},
- {avg_ingress_rate, AvgIngressRate},
- {next_seq_id, NextSeqId} ].
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
+ avg_egress_rate = AvgEgressRate,
+ avg_ingress_rate = AvgIngressRate,
+ next_seq_id = NextSeqId }) ->
+ [ {q1 , queue:len(Q1)},
+ {q2 , bpqueue:len(Q2)},
+ {delta , Delta},
+ {q3 , bpqueue:len(Q3)},
+ {q4 , queue:len(Q4)},
+ {len , Len},
+ {outstanding_txns , length(From)},
+ {target_ram_msg_count , TargetRamMsgCount},
+ {ram_msg_count , RamMsgCount},
+ {ram_index_count , RamIndexCount},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ingress_rate , AvgIngressRate},
+ {next_seq_id , NextSeqId} ].
%%----------------------------------------------------------------------------
%% Minor helpers
@@ -872,19 +883,22 @@ ensure_binary_properties(Msg = #basic_message { content = Content }) ->
%% the first arg is the older delta
combine_deltas(?BLANK_DELTA_PATTERN(X), ?BLANK_DELTA_PATTERN(Y)) ->
?BLANK_DELTA;
-combine_deltas(?BLANK_DELTA_PATTERN(X),
- #delta { start_seq_id = Start, count = Count,
- end_seq_id = End } = B) ->
+combine_deltas(?BLANK_DELTA_PATTERN(X), #delta { start_seq_id = Start,
+ count = Count,
+ end_seq_id = End } = B) ->
true = Start + Count =< End, %% ASSERTION
B;
-combine_deltas(#delta { start_seq_id = Start, count = Count,
- end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
+combine_deltas(#delta { start_seq_id = Start,
+ count = Count,
+ end_seq_id = End } = A, ?BLANK_DELTA_PATTERN(Y)) ->
true = Start + Count =< End, %% ASSERTION
A;
-combine_deltas(#delta { start_seq_id = StartLow, count = CountLow,
- end_seq_id = EndLow },
- #delta { start_seq_id = StartHigh, count = CountHigh,
- end_seq_id = EndHigh }) ->
+combine_deltas(#delta { start_seq_id = StartLow,
+ count = CountLow,
+ end_seq_id = EndLow },
+ #delta { start_seq_id = StartHigh,
+ count = CountHigh,
+ end_seq_id = EndHigh }) ->
Count = CountLow + CountHigh,
true = (StartLow =< StartHigh) %% ASSERTIONS
andalso ((StartLow + CountLow) =< EndLow)
@@ -897,7 +911,9 @@ beta_fold_no_index_on_disk(Fun, Init, Q) ->
permitted_ram_index_count(#vqstate { len = 0 }) ->
undefined;
-permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3,
+permitted_ram_index_count(#vqstate { len = Len,
+ q2 = Q2,
+ q3 = Q3,
delta = #delta { count = DeltaCount } }) ->
AlphaBetaLen = Len - DeltaCount,
case AlphaBetaLen == 0 of
@@ -936,9 +952,11 @@ msg_store_callback(PersistentGuids, IsTransientPubs, Pubs, AckTags, Fun) ->
end)
end.
-tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
- #vqstate { on_sync = OnSync = {SAcks, SPubs, SFuns},
- pending_ack = PA, durable = IsDurable }) ->
+tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun,
+ State = #vqstate {
+ on_sync = OnSync = {SAcks, SPubs, SFuns},
+ pending_ack = PA,
+ durable = IsDurable }) ->
%% If we are a non-durable queue, or (no persisent pubs, and no
%% persistent acks) then we can skip the queue_index loop.
case (not IsDurable) orelse
@@ -946,7 +964,7 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, Fun, State =
lists:foldl(
fun (AckTag, true ) ->
case dict:find(AckTag, PA) of
- {ok, #msg_status{}} -> true;
+ {ok, #msg_status {}} -> true;
{ok, {IsPersistent, _Guid}} -> not IsPersistent
end;
(_AckTag, false) -> false
@@ -1006,15 +1024,15 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
remove_queue_entries(fun rabbit_misc:queue_fold/3,
State #vqstate.q1, IndexState),
{Count + Q1Count,
- State #vqstate { q1 = queue:new(),
+ State #vqstate { q1 = queue:new(),
index_state = IndexState1 }};
false -> {Q3Count, IndexState1} =
remove_queue_entries(fun beta_fold_no_index_on_disk/3,
Q3, IndexState),
purge1(Count + Q3Count,
maybe_deltas_to_betas(
- State #vqstate { index_state = IndexState1,
- q3 = bpqueue:new() }))
+ State #vqstate { q3 = bpqueue:new(),
+ index_state = IndexState1 }))
end.
remove_queue_entries(Fold, Q, IndexState) ->
@@ -1052,10 +1070,13 @@ remove_queue_entries1(
{CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
- q1 = Q1, q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3, q4 = Q4, ram_msg_count = RamMsgCount,
- ram_index_count = RamIndexCount,
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4,
+ ram_msg_count = RamMsgCount,
+ ram_index_count = RamIndexCount,
msg_store_clients = MSCState }) ->
case bpqueue:out(Q3) of
{empty, _Q3} ->
@@ -1072,9 +1093,10 @@ fetch_from_q3_or_delta(State = #vqstate {
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = maybe_dec(RamIndexCount, not IndexOnDisk),
true = RamIndexCount1 >= 0, %% ASSERTION
- State1 = State #vqstate { q3 = Q3a, q4 = Q4a,
- ram_msg_count = RamMsgCount + 1,
- ram_index_count = RamIndexCount1,
+ State1 = State #vqstate { q3 = Q3a,
+ q4 = Q4a,
+ ram_msg_count = RamMsgCount + 1,
+ ram_index_count = RamIndexCount1,
msg_store_clients = MSCState1 },
State2 =
case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of
@@ -1096,12 +1118,13 @@ fetch_from_q3_or_delta(State = #vqstate {
{loaded, State2}
end.
-reduce_memory_use(State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
+reduce_memory_use(State = #vqstate {
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
-reduce_memory_use(State =
- #vqstate { target_ram_msg_count = TargetRamMsgCount }) ->
+reduce_memory_use(State = #vqstate {
+ target_ram_msg_count = TargetRamMsgCount }) ->
State1 = maybe_push_q4_to_betas(maybe_push_q1_to_betas(State)),
case TargetRamMsgCount of
0 -> push_betas_to_deltas(State1);
@@ -1113,8 +1136,9 @@ reduce_memory_use(State =
%%----------------------------------------------------------------------------
test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount,
- q1 = Q1, q3 = Q3 }) ->
+ ram_msg_count = RamMsgCount,
+ q1 = Q1,
+ q3 = Q3 }) ->
case TargetRamMsgCount of
undefined ->
msg;
@@ -1150,9 +1174,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
end.
publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
- IsDelivered, MsgOnDisk, State =
- #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
- persistent_count = PCount, durable = IsDurable }) ->
+ IsDelivered, MsgOnDisk,
+ State = #vqstate { next_seq_id = SeqId,
+ len = Len,
+ in_counter = InCount,
+ persistent_count = PCount,
+ durable = IsDurable }) ->
MsgStatus = #msg_status {
msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsDurable andalso IsPersistent,
@@ -1160,8 +1187,9 @@ publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
index_on_disk = false },
PCount1 = maybe_inc(PCount, IsPersistent),
{SeqId, publish(test_keep_msg_in_ram(SeqId, State), MsgStatus,
- State #vqstate { next_seq_id = SeqId + 1, len = Len + 1,
- in_counter = InCount + 1,
+ State #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
persistent_count = PCount1 })}.
publish(msg, MsgStatus, State) ->
@@ -1186,14 +1214,17 @@ publish(neither, MsgStatus, State) ->
State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} =
maybe_write_to_disk(true, true, MsgStatus, State),
true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
- Delta1 = #delta { start_seq_id = SeqId, count = 1,
- end_seq_id = SeqId + 1 },
+ Delta1 = #delta { start_seq_id = SeqId,
+ count = 1,
+ end_seq_id = SeqId + 1 },
State1 #vqstate { delta = combine_deltas(Delta, Delta1) }.
-store_alpha_entry(MsgStatus, State =
- #vqstate { q1 = Q1, q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3, q4 = Q4 }) ->
+store_alpha_entry(MsgStatus, State = #vqstate {
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
case bpqueue:is_empty(Q2) andalso 0 == DeltaCount andalso
bpqueue:is_empty(Q3) of
true -> true = queue:is_empty(Q1), %% ASSERTION
@@ -1204,9 +1235,9 @@ store_alpha_entry(MsgStatus, State =
store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
- State = #vqstate { q2 = Q2,
+ State = #vqstate { q2 = Q2,
delta = #delta { count = DeltaCount },
- q3 = Q3 }) ->
+ q3 = Q3 }) ->
MsgStatus1 = MsgStatus #msg_status { msg = undefined },
case DeltaCount == 0 of
true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1,
@@ -1271,13 +1302,13 @@ maybe_write_index_to_disk(_Force, MsgStatus, IndexState) ->
{MsgStatus, IndexState}.
maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
- State = #vqstate { msg_store_clients = MSCState,
- index_state = IndexState }) ->
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(
ForceMsg, MsgStatus, MSCState),
{MsgStatus2, IndexState1} = maybe_write_index_to_disk(
ForceIndex, MsgStatus1, IndexState),
- {MsgStatus2, State #vqstate { index_state = IndexState1,
+ {MsgStatus2, State #vqstate { index_state = IndexState1,
msg_store_clients = MSCState1 }}.
%%----------------------------------------------------------------------------
@@ -1321,9 +1352,9 @@ limit_q3_ram_index(Reduction, State = #vqstate { q3 = Q3 })
limit_q3_ram_index(Reduction, State) ->
{Reduction, State}.
-limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
- #vqstate { ram_index_count = RamIndexCount,
- index_state = IndexState }) ->
+limit_ram_index(MapFoldFilterFun, Q, Reduction,
+ State = #vqstate { ram_index_count = RamIndexCount,
+ index_state = IndexState }) ->
{Qa, {Reduction1, IndexState1}} =
MapFoldFilterFun(
fun erlang:'not'/1,
@@ -1337,18 +1368,18 @@ limit_ram_index(MapFoldFilterFun, Q, Reduction, State =
{true, MsgStatus1, {N-1, IndexStateN1}}
end, {Reduction, IndexState}, Q),
RamIndexCount1 = RamIndexCount - (Reduction - Reduction1),
- {Qa, Reduction1, State #vqstate { index_state = IndexState1,
+ {Qa, Reduction1, State #vqstate { index_state = IndexState1,
ram_index_count = RamIndexCount1 }}.
maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
-maybe_deltas_to_betas(
- State = #vqstate { index_state = IndexState, q2 = Q2, q3 = Q3,
- target_ram_msg_count = TargetRamMsgCount,
- delta = Delta = #delta { start_seq_id = DeltaSeqId,
- count = DeltaCount,
- end_seq_id = DeltaSeqIdEnd },
- transient_threshold = TransientThreshold}) ->
+maybe_deltas_to_betas(State = #vqstate {
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState,
+ target_ram_msg_count = TargetRamMsgCount,
+ transient_threshold = TransientThreshold }) ->
case (not bpqueue:is_empty(Q3)) andalso (0 == TargetRamMsgCount) of
true ->
State;
@@ -1356,6 +1387,9 @@ maybe_deltas_to_betas(
%% either q3 is empty, in which case we load at least one
%% segment, or TargetRamMsgCount > 0, meaning we should
%% really be holding all the betas in memory.
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
{List, IndexState1, Delta1SeqId} =
read_one_index_segment(DeltaSeqId, DeltaSeqIdEnd, IndexState),
%% length(List) may be < segment_size because of acks. It
@@ -1375,15 +1409,15 @@ maybe_deltas_to_betas(
0 ->
%% delta is now empty, but it wasn't
%% before, so can now join q2 onto q3
- State1 #vqstate { delta = ?BLANK_DELTA,
- q2 = bpqueue:new(),
- q3 = bpqueue:join(Q3b, Q2) };
+ State1 #vqstate { q2 = bpqueue:new(),
+ delta = ?BLANK_DELTA,
+ q3 = bpqueue:join(Q3b, Q2) };
N when N > 0 ->
- State1 #vqstate {
- q3 = Q3b,
- delta = #delta { start_seq_id = Delta1SeqId,
- count = N,
- end_seq_id = DeltaSeqIdEnd } }
+ Delta1 = #delta { start_seq_id = Delta1SeqId,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd },
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3b }
end
end
end.
@@ -1407,10 +1441,10 @@ maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->
q4 = Q4a }
end, Q4, State).
-maybe_push_alphas_to_betas(
- _Generator, _Consumer, _Q,
- State = #vqstate { ram_msg_count = RamMsgCount,
- target_ram_msg_count = TargetRamMsgCount })
+maybe_push_alphas_to_betas(_Generator, _Consumer, _Q,
+ State = #vqstate {
+ ram_msg_count = RamMsgCount,
+ target_ram_msg_count = TargetRamMsgCount })
when TargetRamMsgCount == undefined orelse TargetRamMsgCount >= RamMsgCount ->
State;
maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
@@ -1420,7 +1454,7 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
ForceIndex = should_force_index_to_disk(State),
{MsgStatus1 = #msg_status { msg_on_disk = true,
index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_msg_count = RamMsgCount,
+ State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
RamIndexCount1 = maybe_inc(RamIndexCount, not IndexOnDisk),
@@ -1430,9 +1464,11 @@ maybe_push_alphas_to_betas(Generator, Consumer, Q, State) ->
Consumer(MsgStatus1, Qa, State2))
end.
-push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,
+push_betas_to_deltas(State = #vqstate { q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
ram_index_count = RamIndexCount,
- index_state = IndexState }) ->
+ index_state = IndexState }) ->
%% HighSeqId is high in the sense that it must be higher than the
%% seq_id in Delta, but it's also the lowest of the betas that we
%% transfer from q2 to delta.
@@ -1449,10 +1485,11 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,
end,
Delta1 = #delta { start_seq_id = Delta1SeqId } =
combine_deltas(Delta, #delta { start_seq_id = HighSeqId,
- count = Len1,
- end_seq_id = EndSeqId }),
- State1 = State #vqstate { q2 = bpqueue:new(), delta = Delta1,
- index_state = IndexState1,
+ count = Len1,
+ end_seq_id = EndSeqId }),
+ State1 = State #vqstate { q2 = bpqueue:new(),
+ delta = Delta1,
+ index_state = IndexState1,
ram_index_count = RamIndexCount1 },
case bpqueue:out(Q3) of
{empty, _Q3} ->
@@ -1479,12 +1516,13 @@ push_betas_to_deltas(State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3,
{SeqIdMax, Len2, Q3a, RamIndexCount2, IndexState2} =
push_betas_to_deltas(fun bpqueue:out_r/1, Limit, Q3,
RamIndexCount1, IndexState1),
- Delta2 = combine_deltas(#delta { start_seq_id = Limit,
- count = Len2,
- end_seq_id = SeqIdMax+1 },
- Delta1),
- State1 #vqstate { q3 = Q3a, delta = Delta2,
- index_state = IndexState2,
+ Delta2 = #delta { start_seq_id = Limit,
+ count = Len2,
+ end_seq_id = SeqIdMax + 1 },
+ Delta3 = combine_deltas(Delta2, Delta1),
+ State1 #vqstate { delta = Delta3,
+ q3 = Q3a,
+ index_state = IndexState2,
ram_index_count = RamIndexCount2 }
end
end.