summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_variable_queue.erl137
1 files changed, 63 insertions, 74 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 5b1419efab..3e66e000b2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -451,14 +451,13 @@ publish_delivered(true, Msg = #basic_message { guid = Guid,
in_counter = InCount + 1 },
{SeqId,
case MsgStatus1 #msg_status.msg_on_disk of
- true ->
- {#msg_status { index_on_disk = true }, IndexState1} =
- maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- State1 #vqstate { index_state = IndexState1,
- pending_ack = dict:store(SeqId, {true, Guid},
- PA) };
- false ->
- State1 #vqstate { pending_ack = dict:store(SeqId, MsgStatus1, PA) }
+ true -> {#msg_status { index_on_disk = true }, IndexState1} =
+ maybe_write_index_to_disk(false, MsgStatus1, IndexState),
+ PA1 = dict:store(SeqId, {true, Guid}, PA),
+ State1 #vqstate { index_state = IndexState1,
+ pending_ack = PA1 };
+ false -> PA1 = dict:store(SeqId, MsgStatus1, PA),
+ State1 #vqstate { pending_ack = PA1 }
end}.
fetch(AckRequired, State =
@@ -517,14 +516,13 @@ fetch(AckRequired, State =
%% 4. If an ack is required, add something sensible to PA
PA1 = case AckRequired of
- true ->
- Entry =
- case MsgOnDisk of
- true -> {IsPersistent, Guid};
- false -> MsgStatus #msg_status {
- is_delivered = true }
- end,
- dict:store(SeqId, Entry, PA);
+ true -> Entry =
+ case MsgOnDisk of
+ true -> {IsPersistent, Guid};
+ false -> MsgStatus #msg_status {
+ is_delivered = true }
+ end,
+ dict:store(SeqId, Entry, PA);
false -> PA
end,
@@ -611,15 +609,13 @@ tx_commit(Txn, Fun, State = #vqstate { durable = IsDurable }) ->
IsTransientPubs = [] == PersistentGuids,
{AckTags1,
case (not IsDurable) orelse IsTransientPubs of
- true ->
- tx_commit_post_msg_store(
- IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
- false ->
- ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids, IsTransientPubs,
- PubsOrdered, AckTags1, Fun)),
- State
+ true -> tx_commit_post_msg_store(
+ IsTransientPubs, PubsOrdered, AckTags1, Fun, State);
+ false -> ok = rabbit_msg_store:sync(
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
+ msg_store_callback(PersistentGuids, IsTransientPubs,
+ PubsOrdered, AckTags1, Fun)),
+ State
end}.
requeue(AckTags, State) ->
@@ -681,9 +677,9 @@ set_ram_duration_target(
Rate = AvgEgressRate + AvgIngressRate,
TargetRamMsgCount1 =
case DurationTarget of
- infinity -> undefined;
+ infinity -> undefined;
undefined -> undefined;
- _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
end,
State1 = State #vqstate { target_ram_msg_count = TargetRamMsgCount1,
duration_target = DurationTarget },
@@ -895,9 +891,7 @@ combine_deltas(#delta { start_seq_id = StartLow, count = CountLow,
#delta { start_seq_id = StartLow, count = Count, end_seq_id = EndHigh }.
beta_fold_no_index_on_disk(Fun, Init, Q) ->
- bpqueue:foldr(fun (_Prefix, Value, Acc) ->
- Fun(Value, Acc)
- end, Init, Q).
+ bpqueue:foldr(fun (_Prefix, Value, Acc) -> Fun(Value, Acc) end, Init, Q).
permitted_ram_index_count(#vqstate { len = 0 }) ->
undefined;
@@ -905,13 +899,11 @@ permitted_ram_index_count(#vqstate { len = Len, q2 = Q2, q3 = Q3,
delta = #delta { count = DeltaCount } }) ->
AlphaBetaLen = Len - DeltaCount,
case AlphaBetaLen == 0 of
- true ->
- undefined;
- false ->
- BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
- %% the fraction of the alphas+betas that are betas
- BetaFrac = BetaLen / AlphaBetaLen,
- BetaLen - trunc(BetaFrac * BetaLen)
+ true -> undefined;
+ false -> BetaLen = bpqueue:len(Q2) + bpqueue:len(Q3),
+ %% the fraction of the alphas+betas that are betas
+ BetaFrac = BetaLen / AlphaBetaLen,
+ BetaLen - trunc(BetaFrac * BetaLen)
end.
@@ -1008,20 +1000,19 @@ delete1(TransientThreshold, NextSeqId, DeltaSeqId, IndexState) ->
purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState }) ->
case bpqueue:is_empty(Q3) of
- true ->
- {Q1Count, IndexState1} =
- remove_queue_entries(
- fun rabbit_misc:queue_fold/3, State #vqstate.q1, IndexState),
- {Count + Q1Count, State #vqstate { q1 = queue:new(),
- index_state = IndexState1 }};
- false ->
- {Q3Count, IndexState1} =
- remove_queue_entries(
- fun beta_fold_no_index_on_disk/3, Q3, IndexState),
- purge1(Count + Q3Count,
- maybe_deltas_to_betas(
- State #vqstate { index_state = IndexState1,
- q3 = bpqueue:new() }))
+ true -> {Q1Count, IndexState1} =
+ remove_queue_entries(fun rabbit_misc:queue_fold/3,
+ State #vqstate.q1, IndexState),
+ {Count + Q1Count,
+ State #vqstate { q1 = queue:new(),
+ index_state = IndexState1 }};
+ false -> {Q3Count, IndexState1} =
+ remove_queue_entries(fun beta_fold_no_index_on_disk/3,
+ Q3, IndexState),
+ purge1(Count + Q3Count,
+ maybe_deltas_to_betas(
+ State #vqstate { index_state = IndexState1,
+ q3 = bpqueue:new() }))
end.
remove_queue_entries(Fold, Q, IndexState) ->
@@ -1031,12 +1022,10 @@ remove_queue_entries(Fold, Q, IndexState) ->
ok = dict:fold(fun (MsgStore, Guids, ok) ->
rabbit_msg_store:remove(MsgStore, Guids)
end, ok, GuidsByStore),
- IndexState2 =
- case SeqIds of
- [] -> IndexState1;
- _ -> rabbit_queue_index:ack(SeqIds, IndexState1)
- end,
- {Count, IndexState2}.
+ {Count, case SeqIds of
+ [] -> IndexState1;
+ _ -> rabbit_queue_index:ack(SeqIds, IndexState1)
+ end}.
remove_queue_entries1(
#msg_status { guid = Guid, seq_id = SeqId,
@@ -1045,12 +1034,11 @@ remove_queue_entries1(
{CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
GuidsByStore1 =
case {MsgOnDisk, IsPersistent} of
- {true, true} ->
- rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE, Guid, GuidsByStore);
- {true, false} ->
- rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore);
- {false, _} ->
- GuidsByStore
+ {true, true} -> rabbit_misc:dict_cons(?PERSISTENT_MSG_STORE,
+ Guid, GuidsByStore);
+ {true, false} -> rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE,
+ Guid, GuidsByStore);
+ {false, _} -> GuidsByStore
end,
SeqIdsAcc1 = case IndexOnDisk of
true -> [SeqId | SeqIdsAcc];
@@ -1236,10 +1224,10 @@ store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
q3 = Q3 }) ->
MsgStatus1 = MsgStatus #msg_status { msg = undefined },
case DeltaCount == 0 of
- true ->
- State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) };
- false ->
- State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
+ true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1,
+ Q3) };
+ false -> State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1,
+ Q2) }
end.
find_msg_store(true) -> ?PERSISTENT_MSG_STORE;
@@ -1309,12 +1297,12 @@ limit_ram_index(State = #vqstate { ram_index_count = RamIndexCount }) ->
Reduction = lists:min([RamIndexCount - Permitted,
?RAM_INDEX_BATCH_SIZE]),
case Reduction < ?RAM_INDEX_BATCH_SIZE of
- true ->
- State;
- false ->
- {Reduction1, State1} = limit_q2_ram_index(Reduction, State),
- {_Red2, State2} = limit_q3_ram_index(Reduction1, State1),
- State2
+ true -> State;
+ false -> {Reduction1, State1} =
+ limit_q2_ram_index(Reduction, State),
+ {_Red2, State2} =
+ limit_q3_ram_index(Reduction1, State1),
+ State2
end;
_ ->
State
@@ -1531,7 +1519,8 @@ push_betas_to_deltas(Generator, Limit, Q, Count, RamIndexCount, IndexState) ->
{{value, IndexOnDisk, MsgStatus}, Qa} ->
{RamIndexCount1, IndexState1} =
case IndexOnDisk of
- true -> {RamIndexCount, IndexState};
+ true ->
+ {RamIndexCount, IndexState};
false ->
{#msg_status { index_on_disk = true }, IndexState2} =
maybe_write_index_to_disk(true, MsgStatus,