diff options
| author | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 13:32:06 +0100 |
|---|---|---|
| committer | Matthias Radestock <matthias@rabbitmq.com> | 2010-06-22 13:32:06 +0100 |
| commit | fafcc3051120e22f610726b1ea0739be18681c78 (patch) | |
| tree | 8c38ff4c6b2c6e069779b8947d693dc326f79f2c | |
| parent | f1f88f65323d694ea68366fd8b0dfe3f42b84a61 (diff) | |
| download | rabbitmq-server-git-fafcc3051120e22f610726b1ea0739be18681c78.tar.gz | |
massivly simplify variable_queue:publish
Rather than trying to figure out which of q{1-4} or delta a message
should go into, we make a straightforward decision to stuff it into
either q1 or q4 and then use the existing logic for dealing with
memory pressure to shuffle it to the right place.
| -rw-r--r-- | src/rabbit_variable_queue.erl | 119 |
1 files changed, 31 insertions, 88 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 23879df780..bd91283750 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -426,9 +426,8 @@ purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len }) -> persistent_count = 0 })}. publish(Msg, State) -> - State1 = limit_ram_index(State), - {_SeqId, State2} = publish(Msg, false, false, State1), - a(State2). + {_SeqId, State1} = publish(Msg, false, false, State), + a(limit_ram_index(State1)). publish_delivered(false, _Msg, State = #vqstate { len = 0 }) -> {blank_ack, a(State)}; @@ -1056,97 +1055,30 @@ reduce_memory_use(State = #vqstate { %% Internal gubbins for publishing %%---------------------------------------------------------------------------- -msg_storage_type(_SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, - ram_msg_count = RamMsgCount }) - when TargetRamMsgCount =:= infinity orelse TargetRamMsgCount > RamMsgCount -> - msg; -msg_storage_type( SeqId, #vqstate { target_ram_msg_count = 0, q3 = Q3 }) -> - case bpqueue:out(Q3) of - {empty, _Q3} -> - %% if TargetRamMsgCount == 0, we know we have no - %% alphas. If q3 is empty then delta must be empty too, so - %% create a beta, which should end up in q3 - index; - {{value, _IndexOnDisk, #msg_status { seq_id = OldSeqId }}, _Q3a} -> - %% Don't look at the current delta as it may be empty. If - %% the SeqId is still within the current segment, it'll be - %% a beta, else it'll go into delta - case SeqId >= rabbit_queue_index:next_segment_boundary(OldSeqId) of - true -> neither; - false -> index - end - end; -msg_storage_type(_SeqId, #vqstate { q1 = Q1 }) -> - case queue:is_empty(Q1) of - true -> index; - %% Can push out elders (in q1) to disk. This may also result - %% in the msg itself going to disk and q2/q3. - false -> msg - end. - publish(Msg = #basic_message { is_persistent = IsPersistent }, IsDelivered, MsgOnDisk, - State = #vqstate { next_seq_id = SeqId, + State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4, + next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount, - durable = IsDurable }) -> + durable = IsDurable, + ram_msg_count = RamMsgCount }) -> IsPersistent1 = IsDurable andalso IsPersistent, MsgStatus = (msg_status(IsPersistent1, SeqId, Msg)) #msg_status { is_delivered = IsDelivered, msg_on_disk = MsgOnDisk }, + {MsgStatus1, State1} = maybe_write_to_disk(false, false, MsgStatus, State), + State2 = case bpqueue:is_empty(Q3) of + false -> State1 #vqstate { q1 = queue:in(MsgStatus1, Q1) }; + true -> State1 #vqstate { q4 = queue:in(MsgStatus1, Q4) } + end, PCount1 = PCount + one_if(IsPersistent1), - {SeqId, publish(msg_storage_type(SeqId, State), MsgStatus, - State #vqstate { next_seq_id = SeqId + 1, - len = Len + 1, - in_counter = InCount + 1, - persistent_count = PCount1 })}. - -publish(msg, MsgStatus, State) -> - {MsgStatus1, State1 = #vqstate { ram_msg_count = RamMsgCount }} = - maybe_write_to_disk(false, false, MsgStatus, State), - State2 = State1 # vqstate {ram_msg_count = RamMsgCount + 1 }, - store_alpha_entry(MsgStatus1, State2); - -publish(index, MsgStatus, State) -> - ForceIndex = should_force_index_to_disk(State), - {MsgStatus1 = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { ram_index_count = RamIndexCount, q1 = Q1 }} = - maybe_write_to_disk(true, ForceIndex, MsgStatus, State), - RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk), - State2 = State1 #vqstate { ram_index_count = RamIndexCount1 }, - true = queue:is_empty(Q1), %% ASSERTION - store_beta_entry(MsgStatus1, State2); - -publish(neither, MsgStatus, State) -> - {#msg_status { msg_on_disk = true, index_on_disk = true, seq_id = SeqId }, - State1 = #vqstate { q1 = Q1, q2 = Q2, delta = Delta }} = - maybe_write_to_disk(true, true, MsgStatus, State), - true = queue:is_empty(Q1) andalso bpqueue:is_empty(Q2), %% ASSERTION - Delta1 = #delta { start_seq_id = SeqId, - count = 1, - end_seq_id = SeqId + 1 }, - State1 #vqstate { delta = combine_deltas(Delta, Delta1) }. - -store_alpha_entry(MsgStatus, State = #vqstate {q1 = Q1, q3 = Q3, q4 = Q4 }) -> - case bpqueue:is_empty(Q3) of - true -> State #vqstate { q4 = queue:in(MsgStatus, Q4) }; - false -> maybe_push_q1_to_betas( - State #vqstate { q1 = queue:in(MsgStatus, Q1) }) - end. - -store_beta_entry(MsgStatus = #msg_status { msg_on_disk = true, - index_on_disk = IndexOnDisk }, - State = #vqstate { q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3 }) -> - MsgStatus1 = MsgStatus #msg_status { msg = undefined }, - case DeltaCount == 0 of - true -> State #vqstate { q3 = bpqueue:in(IndexOnDisk, MsgStatus1, - Q3) }; - false -> State #vqstate { q2 = bpqueue:in(IndexOnDisk, MsgStatus1, - Q2) } - end. + {SeqId, reduce_memory_use( + State2 #vqstate { next_seq_id = SeqId + 1, + len = Len + 1, + in_counter = InCount + 1, + persistent_count = PCount1, + ram_msg_count = RamMsgCount + 1})}. maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status { msg_on_disk = true }, MSCState) -> @@ -1291,9 +1223,20 @@ maybe_deltas_to_betas(State = #vqstate { maybe_push_q1_to_betas(State = #vqstate { q1 = Q1 }) -> maybe_push_alphas_to_betas( fun queue:out/1, - fun (MsgStatus, Q1a, State1) -> - %% these could legally go to q3 if delta and q2 are empty - store_beta_entry(MsgStatus, State1 #vqstate { q1 = Q1a }) + fun (MsgStatus = #msg_status { msg_on_disk = true, + index_on_disk = IndexOnDisk }, + Q1a, State1 = #vqstate { q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3 }) -> + MsgStatus1 = MsgStatus #msg_status { msg = undefined }, + case DeltaCount == 0 of + true -> State1 #vqstate { + q1 = Q1a, + q3 = bpqueue:in(IndexOnDisk, MsgStatus1, Q3) }; + false -> State1 #vqstate { + q1 = Q1a, + q2 = bpqueue:in(IndexOnDisk, MsgStatus1, Q2) } + end end, Q1, State). maybe_push_q4_to_betas(State = #vqstate { q4 = Q4 }) -> |
