diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-19 16:19:56 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-19 16:19:56 +0100 |
| commit | 7531d0f281c1a7a652336e31dbcaeb502b6715ab (patch) | |
| tree | 89d5b2cab75f600c56233eae919306530090e395 | |
| parent | 5ea831a8b641f8282f375972a65038d85440760b (diff) | |
| download | rabbitmq-server-git-7531d0f281c1a7a652336e31dbcaeb502b6715ab.tar.gz | |
Lots of fixes for handling of acktags, which was badly broken in some places
| -rw-r--r-- | src/rabbit_variable_queue.erl | 72 |
1 files changed, 35 insertions, 37 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 1a609dcc99..9328164b64 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -201,7 +201,7 @@ -type(bpqueue() :: any()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack', seq_id()} | 'blank_ack'). +-type(ack() :: seq_id() | 'blank_ack'). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer (), @@ -405,7 +405,7 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1 }, - {{ack, SeqId}, + {SeqId, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = @@ -431,7 +431,7 @@ fetch(AckRequired, State = Q4a} -> AckTag = case AckRequired of - true -> {ack, SeqId}; + true -> SeqId; false -> blank_ack end, @@ -501,7 +501,7 @@ ack(AckTags, State = #vqstate { index_state = IndexState, pending_ack = PA }) -> {GuidsByStore, SeqIds, PA1} = lists:foldl( - fun ({ack, SeqId}, {Dict, SeqIds, PAN}) -> + fun (SeqId, {Dict, SeqIds, PAN}) -> PAN1 = dict:erase(SeqId, PAN), case dict:find(SeqId, PAN) of {ok, #msg_status { index_on_disk = false, %% ASSERTIONS @@ -580,10 +580,9 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun ({ack, SeqId}, - {SeqIdsAcc, Dict, StateN = - #vqstate { msg_store_clients = MSCStateN, - pending_ack = PAN}}) -> + fun (SeqId, {SeqIdsAcc, Dict, StateN = + #vqstate { msg_store_clients = MSCStateN, + pending_ack = PAN}}) -> PAN1 = dict:erase(SeqId, PAN), StateN1 = StateN #vqstate { pending_ack = PAN1 }, case dict:find(SeqId, PAN) of @@ -715,28 +714,27 @@ remove_pending_ack(KeepPersistent, State = #vqstate { pending_ack = PA, persistent_store = PersistentStore, index_state = IndexState }) -> - {SeqIds, GuidsByStore} = - dict:fold(fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> - case IsPersistent of - true -> {[SeqId | SeqIdsAcc], - rabbit_misc:dict_cons( - PersistentStore, Guid, Dict)}; - false -> {SeqIdsAcc, - rabbit_misc:dict_cons( - ?TRANSIENT_MSG_STORE, Guid, Dict)} - end; - (_SeqId, #msg_status {}, Acc) -> - Acc - end, {[], dict:new()}, PA), + {SeqIds, GuidsByStore, PA1} = + dict:fold( + fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict, PAN}) -> + PAN1 = case KeepPersistent andalso IsPersistent of + true -> PAN; + false -> dict:erase(SeqId, PAN) + end, + case IsPersistent of + true -> {[SeqId | SeqIdsAcc], + rabbit_misc:dict_cons( + PersistentStore, Guid, Dict), PAN1}; + false -> {SeqIdsAcc, + rabbit_misc:dict_cons( + ?TRANSIENT_MSG_STORE, Guid, Dict), PAN1} + end; + (SeqId, #msg_status {}, {SeqIdsAcc, Dict, PAN}) -> + {SeqIdsAcc, Dict, dict:erase(SeqId, PAN)} + end, {[], dict:new(), PA}, PA), case KeepPersistent of true -> - State1 = - State #vqstate { - pending_ack = - dict:filter( - fun (_SeqId, {IsPersistent, _Guid}) -> IsPersistent; - (_SeqId, #msg_status {}) -> false - end, PA) }, + State1 = State #vqstate { pending_ack = PA1 }, case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of error -> State1; {ok, Guids} -> ok = rabbit_msg_store:remove( @@ -911,9 +909,14 @@ tx_commit_post_msg_store(IsTransientPubs, Pubs, AckTags, From, State = %% persistent acks) then we can skip the queue_index loop. case PersistentStore == ?TRANSIENT_MSG_STORE orelse (IsTransientPubs andalso - lists:foldl(fun (AckTag, true ) -> dict:is_key(AckTag, PA); - (_AckTag, false) -> false - end, true, AckTags)) of + lists:foldl( + fun (AckTag, true ) -> + case dict:find(AckTag, PA) of + {ok, #msg_status{}} -> true; + {ok, {IsPersistent, _Guid}} -> not IsPersistent + end; + (_AckTag, false) -> false + end, true, AckTags)) of true -> State1 = tx_commit_index(State #vqstate { on_sync = {[], [Pubs], [From]} }), State1 #vqstate { on_sync = OnSync }; @@ -928,11 +931,6 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), - AckSeqIds = lists:foldl(fun ({ack, SeqId, _Guid, true}, SeqIdsAcc) -> - [SeqId | SeqIdsAcc]; - (_, SeqIdsAcc) -> - SeqIdsAcc - end, [], Acks), IsPersistentStore = ?PERSISTENT_MSG_STORE == PersistentStore, Pubs = lists:flatten(lists:reverse(SPubs)), {SeqIds, State2 = #vqstate { index_state = IndexState }} = @@ -945,7 +943,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, true -> [SeqId | SeqIdsAcc]; false -> SeqIdsAcc end, StateN1} - end, {AckSeqIds, State1}, Pubs), + end, {Acks, State1}, Pubs), IndexState1 = rabbit_queue_index:sync_seq_ids(SeqIds, IndexState), [ gen_server2:reply(From, ok) || From <- lists:reverse(SFroms) ], |
