diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-16 14:02:22 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-16 14:02:22 +0100 |
| commit | c0ba7b2d281834605eb60c67b5cfa97d6c24b759 (patch) | |
| tree | f4d4ec6223e83a8c8440099568f6ab90a4ed0881 | |
| parent | 67b986d695826fe3d906f95e0b886a26790c75c2 (diff) | |
| download | rabbitmq-server-git-c0ba7b2d281834605eb60c67b5cfa97d6c24b759.tar.gz | |
Changes to the way we track messages that are pending acks to ensure that on queue deletion we remove everything from the message stores that shouldn't be there
| -rw-r--r-- | src/rabbit_variable_queue.erl | 166 |
1 files changed, 107 insertions, 59 deletions
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index aa1589a6d0..7cebf2b1bf 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -201,7 +201,7 @@ -type(bpqueue() :: any()). -type(seq_id() :: non_neg_integer()). --type(ack() :: {'ack', seq_id(), guid(), boolean()} | 'blank_ack'). +-type(ack() :: {'ack', seq_id()} | 'blank_ack'). -type(delta() :: #delta { start_seq_id :: non_neg_integer(), count :: non_neg_integer (), @@ -229,7 +229,7 @@ rate_timestamp :: {integer(), integer(), integer()}, len :: non_neg_integer(), on_sync :: {[[ack()]], [[guid()]], [{pid(), any()}]}, - msg_store_clients :: {{any(), binary()}, {any(), binary()}}, + msg_store_clients :: 'undefined' | {{any(), binary()}, {any(), binary()}}, persistent_store :: pid() | atom(), persistent_count :: non_neg_integer(), transient_threshold :: non_neg_integer() @@ -323,23 +323,25 @@ terminate(State) -> State1 = #vqstate { persistent_count = PCount, index_state = IndexState, msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}} } = - tx_commit_index(State), + remove_pending_ack(true, tx_commit_index(State)), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), Terms = [{persistent_ref, PRef}, {transient_ref, TRef}, {persistent_count, PCount}], State1 #vqstate { index_state = - rabbit_queue_index:terminate(Terms, IndexState) }. + rabbit_queue_index:terminate(Terms, IndexState), + msg_store_clients = undefined }. %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(State) -> - {_PurgeCount, State1 = #vqstate { - index_state = IndexState, - msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, - persistent_store = PersistentStore, - transient_threshold = TransientThreshold }} = - purge(State), + {_PurgeCount, State1} = purge(State), + State2 = #vqstate { + index_state = IndexState, + msg_store_clients = {{MSCStateP, PRef}, {MSCStateT, TRef}}, + persistent_store = PersistentStore, + transient_threshold = TransientThreshold } = + remove_pending_ack(false, State1), %% flushing here is good because it deletes all full segments, %% leaving only partial segments around. IndexState1 = rabbit_queue_index:flush_journal(IndexState), @@ -359,7 +361,8 @@ delete_and_terminate(State) -> rabbit_msg_store:delete_client(?TRANSIENT_MSG_STORE, TRef), rabbit_msg_store:client_terminate(MSCStateP), rabbit_msg_store:client_terminate(MSCStateT), - State1 #vqstate { index_state = IndexState5 }. + State2 #vqstate { index_state = IndexState5, + msg_store_clients = undefined }. purge(State = #vqstate { q4 = Q4, index_state = IndexState, len = Len, persistent_store = PersistentStore }) -> @@ -402,16 +405,16 @@ publish_delivered(true, Msg = #basic_message { guid = Guid, next_seq_id = SeqId + 1, out_counter = OutCount + 1, in_counter = InCount + 1 }, - AckTag = {ack, SeqId, Guid, IsPersistent}, - {AckTag, + {{ack, SeqId}, case MsgStatus1 #msg_status.msg_on_disk of true -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), - State1 #vqstate { index_state = IndexState1 }; + State1 #vqstate { index_state = IndexState1, + pending_ack = dict:store(SeqId, {true, Guid}, + PA) }; false -> - State1 #vqstate { pending_ack = - dict:store(AckTag, MsgStatus1, PA) } + State1 #vqstate { pending_ack = dict:store(SeqId, MsgStatus1, PA) } end}. fetch(AckRequired, State = @@ -428,7 +431,7 @@ fetch(AckRequired, State = Q4a} -> AckTag = case AckRequired of - true -> {ack, SeqId, Guid, IsPersistent}; + true -> {ack, SeqId}; false -> blank_ack end, @@ -444,17 +447,17 @@ fetch(AckRequired, State = IndexState2 = case MsgOnDisk andalso not AckRequired of true -> %% Remove from disk now + ok = case MsgOnDisk of + true -> + rabbit_msg_store:remove(MsgStore, [Guid]); + false -> + ok + end, case IndexOnDisk of true -> - ok = rabbit_msg_store:remove(MsgStore, [Guid]), rabbit_queue_index:write_acks([SeqId], IndexState1); false -> - ok = case MsgOnDisk of - true -> rabbit_msg_store:remove( - MsgStore, [Guid]); - false -> ok - end, IndexState1 end; false -> @@ -469,10 +472,16 @@ fetch(AckRequired, State = false -> IndexState2 end, - %% 4. If it's not on disk and we need an Ack, add it to PA - PA1 = case AckRequired andalso not MsgOnDisk of - true -> dict:store(AckTag, MsgStatus #msg_status { - is_delivered = true }, PA); + %% 4. If an ack is required, add something sensible to PA + PA1 = case AckRequired of + true -> + Entry = + case MsgOnDisk of + true -> {IsPersistent, Guid}; + false -> MsgStatus #msg_status { + is_delivered = true } + end, + dict:store(SeqId, Entry, PA); false -> PA end, @@ -492,21 +501,19 @@ ack(AckTags, State = #vqstate { index_state = IndexState, pending_ack = PA }) -> {GuidsByStore, SeqIds, PA1} = lists:foldl( - fun (blank_ack, Acc) -> Acc; - ({ack, SeqId, Guid, true}, {Dict, SeqIds, PAN}) -> - {rabbit_misc:dict_cons(PersistentStore, Guid, Dict), - [SeqId | SeqIds], PAN}; - ({ack, _SeqId, Guid, false} = AckTag, {Dict, SeqIds, PAN}) -> - case dict:find(AckTag, PAN) of - error -> - %% must be in the transient store and won't - %% be in the queue index. - {rabbit_misc:dict_cons( - ?TRANSIENT_MSG_STORE, Guid, Dict), SeqIds, PAN}; + fun ({ack, SeqId}, {Dict, SeqIds, PAN}) -> + PAN1 = dict:erase(SeqId, PAN), + case dict:find(SeqId, PAN) of {ok, #msg_status { index_on_disk = false, %% ASSERTIONS msg_on_disk = false, is_persistent = false }} -> - {Dict, SeqIds, dict:erase(AckTag, PAN)} + {Dict, SeqIds, PAN1}; + {ok, {false, Guid}} -> + {rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, + Dict), SeqIds, PAN1}; + {ok, {true, Guid}} -> + {rabbit_misc:dict_cons(PersistentStore, Guid, Dict), + [SeqId | SeqIds], PAN1} end end, {dict:new(), [], PA}, AckTags), IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), @@ -568,23 +575,31 @@ tx_commit(Txn, From, State = #vqstate { persistent_store = PersistentStore }) -> State end}. -requeue(AckTags, State = #vqstate { persistent_store = PersistentStore, - pending_ack = PA }) -> +requeue(AckTags, State = #vqstate { persistent_store = PersistentStore }) -> {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, persistent_count = PCount }} = lists:foldl( - fun ({ack, SeqId, Guid, IsPersistent} = AckTag, - {SeqIdsAcc, Dict, StateN = #vqstate { - msg_store_clients = MSCStateN }}) -> - case dict:find(AckTag, PA) of - error -> + fun ({ack, SeqId}, + {SeqIdsAcc, Dict, StateN = + #vqstate { msg_store_clients = MSCStateN, + pending_ack = PAN}}) -> + PAN1 = dict:erase(SeqId, PAN), + StateN1 = StateN #vqstate { pending_ack = PAN1 }, + case dict:find(SeqId, PAN) of + {ok, #msg_status { index_on_disk = false, + msg_on_disk = false, + is_persistent = false, + msg = Msg }} -> + {_SeqId, StateN2} = publish(Msg, true, false, StateN1), + {SeqIdsAcc, Dict, StateN2}; + {ok, {IsPersistent, Guid}} -> {{ok, Msg = #basic_message{}}, MSCStateN1} = - read_from_msg_store(PersistentStore, MSCStateN, - IsPersistent, Guid), - StateN1 = StateN #vqstate { + read_from_msg_store( + PersistentStore, MSCStateN, IsPersistent, Guid), + StateN2 = StateN1 #vqstate { msg_store_clients = MSCStateN1 }, - {_SeqId, StateN2} = publish(Msg, true, true, StateN1), + {_SeqId, StateN3} = publish(Msg, true, true, StateN2), {SeqIdsAcc1, MsgStore} = case IsPersistent of true -> @@ -594,15 +609,7 @@ requeue(AckTags, State = #vqstate { persistent_store = PersistentStore, end, {SeqIdsAcc1, rabbit_misc:dict_cons(MsgStore, Guid, Dict), - StateN2}; - {ok, #msg_status { index_on_disk = false, - msg_on_disk = false, - is_persistent = false, - msg = Msg }} -> - {_SeqId, StateN1} = publish(Msg, true, false, StateN), - {SeqIdsAcc, Dict, - StateN1 #vqstate { - pending_ack = dict:erase(AckTag, PA) }} + StateN3} end end, {[], dict:new(), State}, AckTags), IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), @@ -704,6 +711,47 @@ status(#vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4, %% Minor helpers %%---------------------------------------------------------------------------- +remove_pending_ack(KeepPersistent, + State = #vqstate { pending_ack = PA, + persistent_store = PersistentStore, + index_state = IndexState }) -> + {SeqIds, GuidsByStore} = + dict:fold(fun (SeqId, {IsPersistent, Guid}, {SeqIdsAcc, Dict}) -> + case IsPersistent of + true -> {[SeqId | SeqIdsAcc], + rabbit_misc:dict_cons( + PersistentStore, Guid, Dict)}; + false -> {SeqIdsAcc, + rabbit_misc:dict_cons( + ?TRANSIENT_MSG_STORE, Guid, Dict)} + end; + (_SeqId, #basic_message {}, Acc) -> + Acc + end, {[], dict:new()}, PA), + case KeepPersistent of + true -> + State1 = + State #vqstate { + pending_ack = + dict:filter( + fun (_SeqId, {IsPersistent, _Guid}) -> IsPersistent; + (_SeqId, #basic_message {}) -> false + end, PA) }, + case dict:find(?TRANSIENT_MSG_STORE, GuidsByStore) of + error -> State1; + {ok, Guids} -> ok = rabbit_msg_store:remove( + ?TRANSIENT_MSG_STORE, Guids), + State1 + end; + false -> + IndexState1 = rabbit_queue_index:write_acks(SeqIds, IndexState), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + State #vqstate { pending_ack = dict:new(), + index_state = IndexState1 } + end. + lookup_tx(Txn) -> case get({txn, Txn}) of undefined -> #tx { pending_messages = [], |
