summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_variable_queue.erl119
1 files changed, 31 insertions, 88 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 23879df780..bd91283750 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -426,9 +426,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) ->
persistent_count = 0 })}.
publish(Msg, State) ->
- State1 = limit_ram_index(State),
- {_SeqId, State2} = publish(Msg, false, false, State1),
- a(State2).
+ {_SeqId, State1} = publish(Msg, false, false, State),
+ a(limit_ram_index(State1)).
publish_delivered(false, _Msg, State = #vqstate { len = 0 }) ->
{blank_ack, a(State)};
@@ -1056,97 +1055,30 @@ reduce_memory_use(State = #vqstate {
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
-msg_storage_type(_SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
- ram_msg_count = RamMsgCount })
- when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount > RamMsgCount ->
- msg;
-msg_storage_type( SeqId, #vqstate { target_ram_msg_count = 0, q3 = Q3 }) ->
- case bpqueue:out(Q3) of
- {empty, _Q3} ->
- %% if TargetRamMsgCount == 0, we know we have no
- %% alphas. If q3 is empty then delta must be empty too, so
- %% create a beta, which should end up in q3
- index;
- {{value, _IndexOnDisk, #msg_status { seq_id = OldSeqId }}, _Q3a} ->
- %% Don't look at the current delta as it may be empty. If
- %% the SeqId is still within the current segment, it'll be
- %% a beta, else it'll go into delta
- case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of
- true -> neither;
- false -> index
- end
- end;
-msg_storage_type(_SeqId, #vqstate { q1 = Q1 }) ->
- case queue:is_empty(Q1) of
- true -> index;
- %% Can push out elders (in q1) to disk. This may also result
- %% in the msg itself going to disk and q2/q3.
- false -> msg
- end.
-
publish(Msg = #basic_message { is_persistent = IsPersistent },
IsDelivered, MsgOnDisk,
- State = #vqstate { next_seq_id = SeqId,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ next_seq_id = SeqId,
len = Len,
in_counter = InCount,
persistent_count = PCount,
- durable = IsDurable }) ->
+ durable = IsDurable,
+ ram_msg_count = RamMsgCount }) ->
IsPersistent1 = IsDurable andalso IsPersistent,
MsgStatus = (msg_status(IsPersistent1, SeqId, Msg))
#msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk },
+ {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State),
+ State2 = case bpqueue:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = queue:in(MsgStatus1, Q1) };
+ true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) }
+ end,
PCount1 = PCount + one_if(IsPersistent1),
- {SeqId, publish(msg_storage_type(SeqId, State), MsgStatus,
- State #vqstate { next_seq_id = SeqId + 1,
- len = Len + 1,
- in_counter = InCount + 1,
- persistent_count = PCount1 })}.
-
-publish(msg, MsgStatus, State) ->
- {MsgStatus1, State1 = #vqstate { ram_msg_count = RamMsgCount }} =
- maybe_write_to_disk(false, false, MsgStatus, State),
- State2 = State1 # vqstate {ram_msg_count = RamMsgCount + 1 },
- store_alpha_entry(MsgStatus1, State2);
-
-publish(index, MsgStatus, State) ->
- ForceIndex = should_force_index_to_disk(State),
- {MsgStatus1 = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { ram_index_count = RamIndexCount, q1 = Q1 }} =
- maybe_write_to_disk(true, ForceIndex, MsgStatus, State),
- RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_index_count = RamIndexCount1 },
- true = queue:is_empty(Q1), %% ASSERTION
- store_beta_entry(MsgStatus1, State2);
-
-publish(neither, MsgStatus, State) ->
- {#msg_status { msg_on_disk = true, index_on_disk = true, seq_id = SeqId },
- State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} =
- maybe_write_to_disk(true, true, MsgStatus, State),
- true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION
- Delta1 = #delta { start_seq_id = SeqId,
- count = 1,
- end_seq_id = SeqId + 1 },
- State1 #vqstate { delta = combine_deltas(Delta, Delta1) }.
-
-store_alpha_entry(MsgStatus, State = #vqstate {q1 = Q1, q3 = Q3, q4 = Q4 }) ->
- case bpqueue:is_empty(Q3) of
- true -> State #vqstate { q4 = queue:in(MsgStatus, Q4) };
- false -> maybe_push_q1_to_betas(
- State #vqstate { q1 = queue:in(MsgStatus, Q1) })
- end.
-
-store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true,
- index_on_disk = IndexOnDisk },
- State = #vqstate { q2 = Q2,
- delta = #delta { count = DeltaCount },
- q3 = Q3 }) ->
- MsgStatus1 = MsgStatus #msg_status { msg = undefined },
- case DeltaCount == 0 of
- true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1,
- Q3) };
- false -> State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1,
- Q2) }
- end.
+ {SeqId, reduce_memory_use(
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ len = Len + 1,
+ in_counter = InCount + 1,
+ persistent_count = PCount1,
+ ram_msg_count = RamMsgCount + 1})}.
maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
msg_on_disk = true }, MSCState) ->
@@ -1291,9 +1223,20 @@ maybe_deltas_to_betas(State = #vqstate {
maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) ->
maybe_push_alphas_to_betas(
fun queue:out/1,
- fun (MsgStatus, Q1a, State1) ->
- %% these could legally go to q3 if delta and q2 are empty
- store_beta_entry(MsgStatus, State1 #vqstate { q1 = Q1a })
+ fun (MsgStatus = #msg_status { msg_on_disk = true,
+ index_on_disk = IndexOnDisk },
+ Q1a, State1 = #vqstate { q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3 }) ->
+ MsgStatus1 = MsgStatus #msg_status { msg = undefined },
+ case DeltaCount == 0 of
+ true -> State1 #vqstate {
+ q1 = Q1a,
+ q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) };
+ false -> State1 #vqstate {
+ q1 = Q1a,
+ q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) }
+ end
end, Q1, State).
maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) ->