summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorRob Harrop <rharrop@vmware.com>2010-09-22 12:51:59 +0100
committerRob Harrop <rharrop@vmware.com>2010-09-22 12:51:59 +0100
commit1e99c1bd6166ec792a9963709d28cd6c46a73dba (patch)
tree39f0199c6d1710edc3894231d5ab9c50119926f5
parent33fa8a2e3207e2ba240b79efac0dd72e6bec76a3 (diff)
downloadrabbitmq-server-git-1e99c1bd6166ec792a9963709d28cd6c46a73dba.tar.gz
fixed problem in vq:tx_commit, moved expiry recalculation to post_msg_store callback
-rw-r--r--src/rabbit_variable_queue.erl29
1 files changed, 15 insertions, 14 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 6dc24e80b9..202f2c99d2 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -613,22 +613,18 @@ tx_rollback(Txn, State = #vqstate { durable = IsDurable }) ->
tx_commit(Txn, Fun, MsgPropsFun, State = #vqstate { durable = IsDurable }) ->
#tx { pending_acks = AckTags, pending_messages = Pubs } = lookup_tx(Txn),
erase_tx(Txn),
- PubsOrdered = lists:foldl(
- fun ({Msg, MsgProps}, Acc) ->
- [{Msg, MsgPropsFun(MsgProps)} | Acc]
- end, [], Pubs),
AckTags1 = lists:append(AckTags),
- PersistentGuids = persistent_guids(PubsOrdered),
+ PersistentGuids = persistent_guids(Pubs),
HasPersistentPubs = PersistentGuids =/= [],
{AckTags1,
a(case IsDurable andalso HasPersistentPubs of
true -> ok = rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE, PersistentGuids,
- msg_store_callback(PersistentGuids,
- PubsOrdered, AckTags1, Fun)),
+ msg_store_callback(PersistentGuids,Pubs, AckTags1,
+ Fun, MsgPropsFun)),
State;
- false -> tx_commit_post_msg_store(
- HasPersistentPubs, PubsOrdered, AckTags1, Fun, State)
+ false -> tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags1,
+ Fun, MsgPropsFun, State)
end)}.
requeue(AckTags, MsgPropsFun, State) ->
@@ -901,11 +897,12 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
%% Internal major helpers for Public API
%%----------------------------------------------------------------------------
-msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
+msg_store_callback(PersistentGuids, Pubs, AckTags, Fun, MsgPropsFun) ->
Self = self(),
F = fun () -> rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self, fun (StateN) -> tx_commit_post_msg_store(
- true, Pubs, AckTags, Fun, StateN)
+ true, Pubs, AckTags,
+ Fun, MsgPropsFun, StateN)
end)
end,
fun () -> spawn(fun () -> ok = rabbit_misc:with_exit_handler(
@@ -916,7 +913,7 @@ msg_store_callback(PersistentGuids, Pubs, AckTags, Fun) ->
end)
end.
-tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
+tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun, MsgPropsFun,
State = #vqstate {
on_sync = OnSync = #sync {
acks_persistent = SPAcks,
@@ -935,17 +932,21 @@ tx_commit_post_msg_store(HasPersistentPubs, Pubs, AckTags, Fun,
end];
false -> []
end,
+ PubsOrdered = lists:foldl(
+ fun ({Msg, MsgProps}, Acc) ->
+ [{Msg, MsgPropsFun(MsgProps)} | Acc]
+ end, [], Pubs),
case IsDurable andalso (HasPersistentPubs orelse PersistentAcks =/= []) of
true -> State #vqstate { on_sync = #sync {
acks_persistent = [PersistentAcks | SPAcks],
acks_all = [AckTags | SAcks],
- pubs = [Pubs | SPubs],
+ pubs = [PubsOrdered | SPubs],
funs = [Fun | SFuns] }};
false -> State1 = tx_commit_index(
State #vqstate { on_sync = #sync {
acks_persistent = [],
acks_all = [AckTags],
- pubs = [Pubs],
+ pubs = [PubsOrdered],
funs = [Fun] } }),
State1 #vqstate { on_sync = OnSync }
end.