diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 57 |
1 files changed, 31 insertions, 26 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 05e540c0bf..3cfebb50ef 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -620,12 +620,8 @@ internal_fetch(AckRequired, Len1 = Len - 1, RamMsgCount1 = RamMsgCount - one_if(Msg =/= undefined), - RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), - true = RamIndexCount1 >= 0, %% ASSERTION - {{Msg, IsDelivered, AckTag, Len1}, a(State #vqstate { ram_msg_count = RamMsgCount1, - ram_index_count = RamIndexCount1, out_counter = OutCount + 1, index_state = IndexState2, len = Len1, @@ -987,22 +983,20 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, end]; false -> [] end, - PubsOrdered = lists:foldl( - fun ({Msg, MsgProps}, Acc) -> - [{Msg, MsgPropsFun(MsgProps)} | Acc] - end, [], Pubs), case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of - true -> State #vqstate { on_sync = #sync { - acks_persistent = [PersistentAcks | SPAcks], - acks_all = [AckTags | SAcks], - pubs = [PubsOrdered | SPubs], - funs = [Fun | SFuns] }}; + true -> State #vqstate { + on_sync = #sync { + acks_persistent = [PersistentAcks | SPAcks], + acks_all = [AckTags | SAcks], + pubs = [{MsgPropsFun, Pubs} | SPubs], + funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( - State #vqstate { on_sync = #sync { - acks_persistent = [], - acks_all = [AckTags], - pubs = [PubsOrdered], - funs = [Fun] } }), + State #vqstate { + on_sync = #sync { + acks_persistent = [], + acks_all = [AckTags], + pubs = [{MsgPropsFun, Pubs}], + funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. @@ -1016,7 +1010,13 @@ tx_commit_index(State = #vqstate { on_sync = #sync { durable = IsDurable }) -> PAcks = lists:append(SPAcks), Acks = lists:append(SAcks), - Pubs = lists:append(lists:reverse(SPubs)), + Pubs = lists:foldl( + fun({Fun, PubsN}, OuterAcc) -> + lists:foldl( + fun({Msg, MsgProps}, Acc) -> + [{Msg, Fun(MsgProps)} | Acc] + end, OuterAcc, PubsN) + end, [], SPubs), {SeqIds, State1 = #vqstate { index_state = IndexState }} = lists:foldl( fun ({Msg = #basic_message { is_persistent = IsPersistent }, @@ -1325,16 +1325,21 @@ chunk_size(Current, Permitted) -> lists:min([Current - Permitted, ?IO_BATCH_SIZE]). fetch_from_q3(State = #vqstate { - q1 = Q1, - q2 = Q2, - delta = #delta { count = DeltaCount }, - q3 = Q3, - q4 = Q4 }) -> + q1 = Q1, + q2 = Q2, + delta = #delta { count = DeltaCount }, + q3 = Q3, + q4 = Q4, + ram_index_count = RamIndexCount}) -> case bpqueue:out(Q3) of {empty, _Q3} -> {empty, State}; - {{value, _IndexOnDisk, MsgStatus}, Q3a} -> - State1 = State #vqstate { q3 = Q3a }, + {{value, IndexOnDisk, MsgStatus}, Q3a} -> + RamIndexCount1 = RamIndexCount - one_if(not IndexOnDisk), + true = RamIndexCount1 >= 0, %% ASSERTION + + State1 = State #vqstate { q3 = Q3a, + ram_index_count = RamIndexCount1 }, State2 = case {bpqueue:is_empty(Q3a), 0 == DeltaCount} of {true, true} -> |
