summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-08 11:27:24 +0000
committerAlexandru Scvortov <alexandru@rabbitmq.com>2010-11-08 11:27:24 +0000
commit764eb6054f47aedd28d538b033fbf9cdc48ed9ed (patch)
tree262fa3023335649b9589fc804ad079bd923742f5
parent405adb46fab99a8fe40cc373cc8b212276f6d8d2 (diff)
downloadrabbitmq-server-git-764eb6054f47aedd28d538b033fbf9cdc48ed9ed.tar.gz
confirm messages on queue purge
-rw-r--r--src/rabbit_msg_store.erl46
-rw-r--r--src/rabbit_queue_index.erl4
-rw-r--r--src/rabbit_variable_queue.erl6
3 files changed, 40 insertions, 16 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index d04f679f2a..e84e3d8f44 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -144,7 +144,7 @@
-type(startup_fun_state() ::
{(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})),
A}).
--type(guid_fun() :: fun (([rabbit_guid:guid()]) -> any())).
+-type(guid_fun() :: fun ((gb_set()) -> any())).
-spec(start_link/4 ::
(atom(), file:filename(), [binary()] | 'undefined',
@@ -397,7 +397,8 @@ read(Guid,
contains(Guid, CState) -> server_call(CState, {contains, Guid}).
remove([], _CState) -> ok;
-remove(Guids, CState) -> server_cast(CState, {remove, Guids}).
+remove(Guids, CState = #client_msstate { client_ref = CRef }) ->
+ server_cast(CState, {remove, CRef, Guids}).
release([], _CState) -> ok;
release(Guids, CState) -> server_cast(CState, {release, Guids}).
sync(Guids, K, CState) -> server_cast(CState, {sync, Guids, K}).
@@ -696,7 +697,10 @@ handle_cast({write, CRef, Guid},
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
[{Guid, Msg, _CacheRefCount}] = ets:lookup(CurFileCacheEts, Guid),
CTG1 = case dict:find(CRef, CODC) of
- {ok, _} -> rabbit_misc:dict_cons(CRef, Guid, CTG);
+ {ok, _} -> dict:update(CRef, fun(Guids) ->
+ gb_sets:add(Guid, Guids)
+ end,
+ gb_sets:empty(), CTG);
error -> CTG
end,
State1 = State #msstate { cref_to_guids = CTG1 },
@@ -722,17 +726,18 @@ handle_cast({write, CRef, Guid},
ok = index_update_ref_count(Guid, RefCount + 1, State1),
CTG2 = case {dict:find(CRef, CODC), File} of
{{ok, _}, CurFile} -> CTG1;
- {{ok, Fun}, _} -> Fun([Guid]), CTG;
+ {{ok, Fun}, _} -> Fun(gb_sets:singleton(Guid)), CTG;
_ -> CTG1
end,
noreply(State #msstate { cref_to_guids = CTG2 })
end;
-handle_cast({remove, Guids}, State) ->
+handle_cast({remove, CRef, Guids}, State) ->
State1 = lists:foldl(
fun (Guid, State2) -> remove_message(Guid, State2) end,
State, Guids),
- noreply(maybe_compact(State1));
+ State2 = client_confirm(CRef, gb_sets:from_list(Guids), State1),
+ noreply(maybe_compact(State2));
handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
@@ -856,17 +861,19 @@ stop_sync_timer(State = #msstate { sync_timer_ref = TRef }) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
on_sync = Syncs,
- client_ondisk_callback = CODC,
cref_to_guids = CTG }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (_CRef, [], NS) -> NS;
- (CRef, Guids, NS) -> [{CRef, Guids} | NS]
+ CGs = dict:fold(fun (CRef, Guids, NS) ->
+ case gb_sets:is_empty(Guids) of
+ true -> NS;
+ false -> [{CRef, Guids} | NS]
+ end
end, [], CTG),
if Syncs =:= [] andalso CGs =:= [] -> ok;
true -> file_handle_cache:sync(CurHdl)
end,
lists:foreach(fun (K) -> K() end, lists:reverse(Syncs)),
- [(dict:fetch(CRef, CODC))(Guids) || {CRef, Guids} <- CGs],
+ [client_confirm(CRef, Guids, State1) || {CRef, Guids} <- CGs],
State1 #msstate { cref_to_guids = dict:new(), on_sync = [] }.
@@ -1046,6 +1053,25 @@ orddict_store(Key, Val, Dict) ->
false = orddict:is_key(Key, Dict),
orddict:store(Key, Val, Dict).
+client_confirm(CRef, Guids,
+ State = #msstate { client_ondisk_callback = CODC,
+ cref_to_guids = CTG }) ->
+ case dict:find(CRef, CODC) of
+ {ok, Fun} -> Fun(Guids),
+ CTG1 = case dict:find(CRef, CTG) of
+ {ok, Gs} ->
+ Guids1 = gb_sets:difference(Gs, Guids),
+ case gb_sets:is_empty(Guids1) of
+ true -> dict:erase(CRef, CTG);
+ false -> dict:store(CRef, Guids1, CTG)
+ end;
+ error -> CTG
+ end,
+ State #msstate { cref_to_guids = CTG1 };
+ error -> State
+ end.
+
+
%%----------------------------------------------------------------------------
%% file helper functions
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 8a4bb801d9..f61707cc25 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -191,7 +191,7 @@
})).
-type(seq_id() :: integer()).
-type(seg_dict() :: {dict:dictionary(), [segment()]}).
--type(on_sync_fun() :: fun (([rabbit_guid:guid()]) -> ok)).
+-type(on_sync_fun() :: fun ((gb_set()) -> ok)).
-type(qistate() :: #qistate { dir :: file:filename(),
segments :: 'undefined' | seg_dict(),
journal_handle :: hdl(),
@@ -717,7 +717,7 @@ deliver_or_ack(Kind, SeqIds, State) ->
end, State1, SeqIds)).
notify_sync(State = #qistate { unsynced_guids = UG, on_sync = OnSyncFun }) ->
- OnSyncFun(UG),
+ OnSyncFun(gb_sets:from_list(UG)),
State #qistate { unsynced_guids = [] }.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 8463d3d55e..f065e5ea8f 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -1312,12 +1312,11 @@ remove_confirms(GuidSet, State = #vqstate { msgs_on_disk = MOD,
msgs_confirmed(GuidSet, State) ->
{remove_confirms(GuidSet, State), {confirm, gb_sets:to_list(GuidSet)}}.
-msgs_written_to_disk(QPid, Guids) ->
+msgs_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun(State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
msgs_confirmed(gb_sets:intersection(GuidSet, MIOD),
State #vqstate {
msgs_on_disk =
@@ -1325,12 +1324,11 @@ msgs_written_to_disk(QPid, Guids) ->
gb_sets:union(MOD, GuidSet), UC) })
end).
-msg_indices_written_to_disk(QPid, Guids) ->
+msg_indices_written_to_disk(QPid, GuidSet) ->
rabbit_amqqueue:maybe_run_queue_via_backing_queue_async(
QPid, fun(State = #vqstate { msgs_on_disk = MOD,
msg_indices_on_disk = MIOD,
unconfirmed = UC }) ->
- GuidSet = gb_sets:from_list(Guids),
msgs_confirmed(gb_sets:intersection(GuidSet, MOD),
State #vqstate {
msg_indices_on_disk =