summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2010-08-16 00:13:42 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2010-08-16 00:13:42 +0100
commitc5c7e483e1d12222ba84fc7d7d805de6257cbb6d (patch)
tree7230469e84605bb4f65bd016b148fb008c5707c3
parent3afb316eeec2b5354497d949a46e3251e57bfda2 (diff)
downloadrabbitmq-server-git-c5c7e483e1d12222ba84fc7d7d805de6257cbb6d.tar.gz
Permit 0 ref counts and don't actually forget about messages until the file gets removed
-rw-r--r--src/rabbit_msg_store.erl118
1 files changed, 78 insertions, 40 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 4600efc124..c55380d332 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -332,9 +332,11 @@ read(Server, Guid,
Server, 2, {read, Guid}, infinity),
CState} end,
case index_lookup(Guid, CState) of
- not_found -> Defer();
- MsgLocation -> client_read1(Server, MsgLocation, Defer,
- CState)
+ Result when Result =:= not_found orelse
+ (Result #msg_location.ref_count =:= 0) ->
+ Defer();
+ MsgLocation ->
+ client_read1(Server, MsgLocation, Defer, CState)
end;
[{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
@@ -476,7 +478,8 @@ client_read3(Server, #msg_location { guid = Guid, file = File }, Defer,
%% badarg scenario above, but we don't have a missing file
%% - we just have the /wrong/ file).
case index_lookup(Guid, CState) of
- #msg_location { file = File } = MsgLocation ->
+ #msg_location { file = File, ref_count = RefCount } =
+ MsgLocation when RefCount > 0 ->
%% Still the same file.
mark_handle_open(FileHandlesEts, File),
@@ -619,37 +622,48 @@ handle_cast({write, Guid, Msg},
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
+ Write =
+ fun () ->
+ {ok, CurOffset} =
+ file_handle_cache:current_virtual_offset(CurHdl),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
+ ok = index_insert(
+ #msg_location {
+ guid = Guid, ref_count = 1, file = CurFile,
+ offset = CurOffset, total_size = TotalSize }, State),
+ [#file_summary { right = undefined,
+ locked = false,
+ file_size = FileSize } = Summary] =
+ ets:lookup(FileSummaryEts, CurFile),
+ ok = add_to_file_summary(Summary, TotalSize, CurOffset, CurFile,
+ FileSize + TotalSize, State),
+ NextOffset = CurOffset + TotalSize,
+ noreply(
+ maybe_roll_to_new_file(
+ NextOffset, State #msstate {
+ sum_valid_data = SumValid + TotalSize,
+ sum_file_size = SumFileSize + TotalSize }))
+ end,
case index_lookup(Guid, State) of
not_found ->
- %% New message, lots to do
- {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
- ok = index_insert(#msg_location {
- guid = Guid, ref_count = 1, file = CurFile,
- offset = CurOffset, total_size = TotalSize },
- State),
- [#file_summary { valid_total_size = ValidTotalSize,
- contiguous_top = ContiguousTop,
- right = undefined,
- locked = false,
- file_size = FileSize }] =
- ets:lookup(FileSummaryEts, CurFile),
- ValidTotalSize1 = ValidTotalSize + TotalSize,
- ContiguousTop1 = case CurOffset =:= ContiguousTop of
- true -> ValidTotalSize1;
- false -> ContiguousTop
- end,
- true = ets:update_element(
- FileSummaryEts, CurFile,
- [{#file_summary.valid_total_size, ValidTotalSize1},
- {#file_summary.contiguous_top, ContiguousTop1},
- {#file_summary.file_size, FileSize + TotalSize}]),
- NextOffset = CurOffset + TotalSize,
- noreply(
- maybe_roll_to_new_file(
- NextOffset, State #msstate {
- sum_valid_data = SumValid + TotalSize,
- sum_file_size = SumFileSize + TotalSize }));
+ Write();
+ #msg_location { ref_count = 0, file = File, offset = Offset,
+ total_size = TotalSize } ->
+ [#file_summary { locked = Locked,
+ file_size = FileSize } = Summary] =
+ ets:lookup(FileSummaryEts, File),
+ case Locked of
+ true ->
+ ok = index_delete(Guid, State),
+ Write();
+ false ->
+ ok = index_update_fields(
+ Guid, {#msg_location.ref_count, 1}, State),
+ ok = add_to_file_summary(Summary, TotalSize, Offset, File,
+ FileSize, State),
+ noreply(State #msstate {
+ sum_valid_data = SumValid + TotalSize })
+ end;
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
@@ -716,6 +730,7 @@ handle_cast({gc_done, Reclaimed, Src, Dst},
[{#file_summary.locked, false},
{#file_summary.right, SrcRight}]),
true = ets:delete(FileSummaryEts, Src),
+ ok = index_delete_by_file(Src, State),
noreply(
maybe_compact(run_pending(
State #msstate { sum_file_size = SumFileSize - Reclaimed,
@@ -804,10 +819,28 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
+add_to_file_summary(#file_summary { valid_total_size = ValidTotalSize,
+ contiguous_top = ContiguousTop },
+ TotalSize, Offset, File, FileSize,
+ #msstate { file_summary_ets = FileSummaryEts }) ->
+ ValidTotalSize1 = ValidTotalSize + TotalSize,
+ ContiguousTop1 = case Offset =:= ContiguousTop of
+ true -> ValidTotalSize1;
+ false -> ContiguousTop
+ end,
+ true =
+ ets:update_element(
+ FileSummaryEts, File,
+ [{#file_summary.valid_total_size, ValidTotalSize1},
+ {#file_summary.contiguous_top, ContiguousTop1},
+ {#file_summary.file_size, FileSize}]),
+ ok.
+
read_message(Guid, From,
State = #msstate { dedup_cache_ets = DedupCacheEts }) ->
case index_lookup(Guid, State) of
- not_found ->
+ Result when Result =:= not_found orelse
+ (Result #msg_location.ref_count =:= 0) ->
gen_server2:reply(From, not_found),
State;
MsgLocation ->
@@ -880,7 +913,8 @@ read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
case index_lookup(Guid, State) of
- not_found ->
+ Result when Result =:= not_found orelse
+ (Result #msg_location.ref_count =:= 0) ->
gen_server2:reply(From, false),
State;
#msg_location { file = File } ->
@@ -914,7 +948,9 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
true ->
add_to_pending_gc_completion({remove, Guid}, State);
false ->
- ok = index_delete(Guid, State),
+ ok = index_update_fields(
+ Guid, {#msg_location.ref_count, RefCount - 1},
+ State),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
true = ets:update_element(
@@ -927,9 +963,8 @@ remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
_ when 1 < RefCount ->
ok = decrement_cache(DedupCacheEts, Guid),
%% only update field, otherwise bad interaction with concurrent GC
- ok = index_update_fields(Guid,
- {#msg_location.ref_count, RefCount - 1},
- State),
+ ok = index_update_fields(
+ Guid, {#msg_location.ref_count, RefCount - 1}, State),
State
end.
@@ -1501,6 +1536,7 @@ delete_file_if_empty(File, State = #msstate {
end,
true = mark_handle_to_close(FileHandlesEts, File),
true = ets:delete(FileSummaryEts, File),
+ ok = index_delete_by_file(File, State),
State1 = close_handle(File, State),
ok = file:delete(form_filename(Dir, filenum_to_name(File))),
State1 #msstate { sum_file_size = SumFileSize - FileSize };
@@ -1611,7 +1647,9 @@ find_unremoved_messages_in_file(File,
lists:foldl(fun ({Guid, TotalSize, Offset}, Acc = {List, Size}) ->
case Index:lookup(Guid, IndexState) of
#msg_location { file = File, total_size = TotalSize,
- offset = Offset } = Entry ->
+ ref_count = RefCount,
+ offset = Offset } = Entry
+ when RefCount > 0 ->
{[ Entry | List ], TotalSize + Size};
_ ->
Acc