diff options
| author | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-08 11:27:24 +0000 |
|---|---|---|
| committer | Alexandru Scvortov <alexandru@rabbitmq.com> | 2010-11-08 11:27:24 +0000 |
| commit | 764eb6054f47aedd28d538b033fbf9cdc48ed9ed (patch) | |
| tree | 262fa3023335649b9589fc804ad079bd923742f5 | |
| parent | 405adb46fab99a8fe40cc373cc8b212276f6d8d2 (diff) | |
| download | rabbitmq-server-git-764eb6054f47aedd28d538b033fbf9cdc48ed9ed.tar.gz | |
confirm messages on queue purge
| -rw-r--r-- | src/rabbit_msg_store.erl | 46 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 6 |
3 files changed, 40 insertions, 16 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index d04f679f2a..e84e3d8f44 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -144,7 +144,7 @@ -type(startup_fun_state() :: {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). --type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())). +-type(guid_fun() :: fun ((gb_set()) -> any())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', @@ -397,7 +397,8 @@ read(Guid, contains(Guid, CState) -> server_call(CState, {contains, Guid}). remove([], _CState) -> ok; -remove(Guids, CState) -> server_cast(CState, {remove, Guids}). +remove(Guids, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {remove, CRef, Guids}). release([], _CState) -> ok; release(Guids, CState) -> server_cast(CState, {release, Guids}). sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}). @@ -696,7 +697,10 @@ handle_cast({write, CRef, Guid}, true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), [{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid), CTG1 = case dict:find(CRef, CODC) of - {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG); + {ok, _} -> dict:update(CRef, fun(Guids) -> + gb_sets:add(Guid, Guids) + end, + gb_sets:empty(), CTG); error -> CTG end, State1 = State #msstate { cref_to_guids = CTG1 }, @@ -722,17 +726,18 @@ handle_cast({write, CRef, Guid}, ok = index_update_ref_count(Guid, RefCount + 1, State1), CTG2 = case {dict:find(CRef, CODC), File} of {{ok, _}, CurFile} -> CTG1; - {{ok, Fun}, _} -> Fun([Guid]), CTG; + {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), CTG; _ -> CTG1 end, noreply(State #msstate { cref_to_guids = CTG2 }) end; -handle_cast({remove, Guids}, State) -> +handle_cast({remove, CRef, Guids}, State) -> State1 = lists:foldl( fun (Guid, State2) -> remove_message(Guid, State2) end, State, Guids), - noreply(maybe_compact(State1)); + State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1), + noreply(maybe_compact(State2)); handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> @@ -856,17 +861,19 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) -> internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs, - client_ondisk_callback = CODC, cref_to_guids = CTG }) -> State1 = stop_sync_timer(State), - CGs = dict:fold(fun (_CRef, [], NS) -> NS; - (CRef, Guids, NS) -> [{CRef, Guids} | NS] + CGs = dict:fold(fun (CRef, Guids, NS) -> + case gb_sets:is_empty(Guids) of + true -> NS; + false -> [{CRef, Guids} | NS] + end end, [], CTG), if Syncs =:= [] andalso CGs =:= [] -> ok; true -> file_handle_cache:sync(CurHdl) end, lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)), - [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs], + [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs], State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }. @@ -1046,6 +1053,25 @@ orddict_store(Key, Val, Dict) -> false = orddict:is_key(Key, Dict), orddict:store(Key, Val, Dict). +client_confirm(CRef, Guids, + State = #msstate { client_ondisk_callback = CODC, + cref_to_guids = CTG }) -> + case dict:find(CRef, CODC) of + {ok, Fun} -> Fun(Guids), + CTG1 = case dict:find(CRef, CTG) of + {ok, Gs} -> + Guids1 = gb_sets:difference(Gs, Guids), + case gb_sets:is_empty(Guids1) of + true -> dict:erase(CRef, CTG); + false -> dict:store(CRef, Guids1, CTG) + end; + error -> CTG + end, + State #msstate { cref_to_guids = CTG1 }; + error -> State + end. + + %%---------------------------------------------------------------------------- %% file helper functions %%---------------------------------------------------------------------------- diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 8a4bb801d9..f61707cc25 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -191,7 +191,7 @@ })). -type(seq_id() :: integer()). -type(seg_dict() :: {dict:dictionary(), [segment()]}). --type(on_sync_fun() :: fun (([rabbit_guid:guid()]) -> ok)). +-type(on_sync_fun() :: fun ((gb_set()) -> ok)). -type(qistate() :: #qistate { dir :: file:filename(), segments :: 'undefined' | seg_dict(), journal_handle :: hdl(), @@ -717,7 +717,7 @@ deliver_or_ack(Kind, SeqIds, State) -> end, State1, SeqIds)). notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) -> - OnSyncFun(UG), + OnSyncFun(gb_sets:from_list(UG)), State #qistate { unsynced_guids = [] }. %%---------------------------------------------------------------------------- diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8463d3d55e..f065e5ea8f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1312,12 +1312,11 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD, msgs_confirmed(GuidSet, State) -> {remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}. -msgs_written_to_disk(QPid, Guids) -> +msgs_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun(State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), msgs_confirmed(gb_sets:intersection(GuidSet, MIOD), State #vqstate { msgs_on_disk = @@ -1325,12 +1324,11 @@ msgs_written_to_disk(QPid, Guids) -> gb_sets:union(MOD, GuidSet), UC) }) end). -msg_indices_written_to_disk(QPid, Guids) -> +msg_indices_written_to_disk(QPid, GuidSet) -> rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( QPid, fun(State = #vqstate { msgs_on_disk = MOD, msg_indices_on_disk = MIOD, unconfirmed = UC }) -> - GuidSet = gb_sets:from_list(Guids), msgs_confirmed(gb_sets:intersection(GuidSet, MOD), State #vqstate { msg_indices_on_disk = |
