diff options
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6dc24e80b9..202f2c99d2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -613,22 +613,18 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:foldl( - fun ({Msg, MsgProps}, Acc) -> - [{Msg, MsgPropsFun(MsgProps)} | Acc] - end, [], Pubs), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(PubsOrdered), + PersistentGuids = persistent_guids(Pubs), HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, - PubsOrdered, AckTags1, Fun)), + msg_store_callback(PersistentGuids,Pubs, AckTags1, + Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store( - HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + Fun, MsgPropsFun, State) end)}. requeue(AckTags, MsgPropsFun, State) -> @@ -901,11 +897,12 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, Fun, StateN) + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -916,7 +913,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> end) end. -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { acks_persistent = SPAcks, @@ -935,17 +932,21 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, end]; false -> [] end, + PubsOrdered = lists:foldl( + fun ({Msg, MsgProps}, Acc) -> + [{Msg, MsgPropsFun(MsgProps)} | Acc] + end, [], Pubs), case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of true -> State #vqstate { on_sync = #sync { acks_persistent = [PersistentAcks | SPAcks], acks_all = [AckTags | SAcks], - pubs = [Pubs | SPubs], + pubs = [PubsOrdered | SPubs], funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( State #vqstate { on_sync = #sync { acks_persistent = [], acks_all = [AckTags], - pubs = [Pubs], + pubs = [PubsOrdered], funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. |
