diff options
| author | Rob Harrop <rharrop@vmware.com> | 2010-09-22 12:51:59 +0100 |
|---|---|---|
| committer | Rob Harrop <rharrop@vmware.com> | 2010-09-22 12:51:59 +0100 |
| commit | 1e99c1bd6166ec792a9963709d28cd6c46a73dba (patch) | |
| tree | 39f0199c6d1710edc3894231d5ab9c50119926f5 | |
| parent | 33fa8a2e3207e2ba240b79efac0dd72e6bec76a3 (diff) | |
| download | rabbitmq-server-git-1e99c1bd6166ec792a9963709d28cd6c46a73dba.tar.gz | |
fixed problem in vq:tx_commit, moved expiry recalculation to post_msg_store callback
| -rw-r--r-- | src/rabbit_variable_queue.erl | 29 |
1 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 6dc24e80b9..202f2c99d2 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -613,22 +613,18 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) -> tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) -> #tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn), erase_tx(Txn), - PubsOrdered = lists:foldl( - fun ({Msg, MsgProps}, Acc) -> - [{Msg, MsgPropsFun(MsgProps)} | Acc] - end, [], Pubs), AckTags1 = lists:append(AckTags), - PersistentGuids = persistent_guids(PubsOrdered), + PersistentGuids = persistent_guids(Pubs), HasPersistentPubs = PersistentGuids =/= [], {AckTags1, a(case IsDurable andalso HasPersistentPubs of true -> ok = rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, PersistentGuids, - msg_store_callback(PersistentGuids, - PubsOrdered, AckTags1, Fun)), + msg_store_callback(PersistentGuids,Pubs, AckTags1, + Fun, MsgPropsFun)), State; - false -> tx_commit_post_msg_store( - HasPersistentPubs, PubsOrdered, AckTags1, Fun, State) + false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1, + Fun, MsgPropsFun, State) end)}. requeue(AckTags, MsgPropsFun, State) -> @@ -901,11 +897,12 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> +msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) -> Self = self(), F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( - true, Pubs, AckTags, Fun, StateN) + true, Pubs, AckTags, + Fun, MsgPropsFun, StateN) end) end, fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler( @@ -916,7 +913,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) -> end) end. -tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, +tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun, State = #vqstate { on_sync = OnSync = #sync { acks_persistent = SPAcks, @@ -935,17 +932,21 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, end]; false -> [] end, + PubsOrdered = lists:foldl( + fun ({Msg, MsgProps}, Acc) -> + [{Msg, MsgPropsFun(MsgProps)} | Acc] + end, [], Pubs), case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of true -> State #vqstate { on_sync = #sync { acks_persistent = [PersistentAcks | SPAcks], acks_all = [AckTags | SAcks], - pubs = [Pubs | SPubs], + pubs = [PubsOrdered | SPubs], funs = [Fun | SFuns] }}; false -> State1 = tx_commit_index( State #vqstate { on_sync = #sync { acks_persistent = [], acks_all = [AckTags], - pubs = [Pubs], + pubs = [PubsOrdered], funs = [Fun] } }), State1 #vqstate { on_sync = OnSync } end. |
