diff options
| author | Matthew Sackman <matthew@lshift.net> | 2010-04-12 12:21:50 +0100 |
|---|---|---|
| committer | Matthew Sackman <matthew@lshift.net> | 2010-04-12 12:21:50 +0100 |
| commit | 9f3b55b939b29c46188e7e90937bf8edc204793f (patch) | |
| tree | 3af622c044ac012f8ffdb2d51dc9fcbc93de7006 /src | |
| parent | 703a4168d0f612544f657d0f51bdd8710f5dca1b (diff) | |
| download | rabbitmq-server-git-9f3b55b939b29c46188e7e90937bf8edc204793f.tar.gz | |
MsgId => Guid. All tests still pass. The distinction between msg_ids and guids is now complete
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_file.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 262 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 86 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 166 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 108 |
5 files changed, 323 insertions, 323 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl index 0edeb4698d..792f0efaba 100644 --- a/src/rabbit_msg_file.erl +++ b/src/rabbit_msg_file.erl @@ -64,14 +64,14 @@ %%---------------------------------------------------------------------------- -append(FileHdl, MsgId, MsgBody) - when is_binary(MsgId) andalso size(MsgId) =< ?GUID_SIZE_BYTES -> +append(FileHdl, Guid, MsgBody) + when is_binary(Guid) andalso size(Guid) =< ?GUID_SIZE_BYTES -> MsgBodyBin = term_to_binary(MsgBody), MsgBodyBinSize = size(MsgBodyBin), Size = MsgBodyBinSize + ?GUID_SIZE_BYTES, case file_handle_cache:append(FileHdl, <<Size:?INTEGER_SIZE_BITS, - MsgId:?GUID_SIZE_BYTES/binary, + Guid:?GUID_SIZE_BYTES/binary, MsgBodyBin:MsgBodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT}; @@ -83,10 +83,10 @@ read(FileHdl, TotalSize) -> BodyBinSize = Size - ?GUID_SIZE_BYTES, case file_handle_cache:read(FileHdl, TotalSize) of {ok, <<Size:?INTEGER_SIZE_BITS, - MsgId:?GUID_SIZE_BYTES/binary, + Guid:?GUID_SIZE_BYTES/binary, MsgBodyBin:BodyBinSize/binary, ?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} -> - {ok, {MsgId, binary_to_term(MsgBodyBin)}}; + {ok, {Guid, binary_to_term(MsgBodyBin)}}; KO -> KO end. @@ -97,8 +97,8 @@ scan(FileHdl, Offset, Acc) -> eof -> {ok, Acc, Offset}; {corrupted, NextOffset} -> scan(FileHdl, NextOffset, Acc); - {ok, {MsgId, TotalSize, NextOffset}} -> - scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]); + {ok, {Guid, TotalSize, NextOffset}} -> + scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]); _KO -> %% bad message, but we may still have recovered some valid messages {ok, Acc, Offset} @@ -108,9 +108,9 @@ read_next(FileHdl, Offset) -> case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of %% Here we take option 5 from %% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which - %% we read the MsgId as a number, and then convert it back to + %% we read the Guid as a number, and then convert it back to %% a binary in order to work around bugs in Erlang's GC. - {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?GUID_SIZE_BITS>>} -> + {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} -> case Size of 0 -> eof; %% Nothing we can do other than stop _ -> @@ -123,9 +123,9 @@ read_next(FileHdl, Offset) -> case file_handle_cache:read(FileHdl, 1) of {ok, <<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} -> - <<MsgId:?GUID_SIZE_BYTES/binary>> = - <<MsgIdNum:?GUID_SIZE_BITS>>, - {ok, {MsgId, TotalSize, NextOffset}}; + <<Guid:?GUID_SIZE_BYTES/binary>> = + <<GuidNum:?GUID_SIZE_BITS>>, + {ok, {Guid, TotalSize, NextOffset}}; {ok, _SomeOtherData} -> {corrupted, NextOffset}; KO -> KO diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 1a7085a2bd..b6d6c5daa2 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -152,7 +152,7 @@ %% The components: %% -%% MsgLocation: this is a mapping from MsgId to #msg_location{}: +%% MsgLocation: this is a mapping from Guid to #msg_location{}: %% {Guid, RefCount, File, Offset, TotalSize} %% By default, it's in ets, but it's also pluggable. %% FileSummary: this is an ets table which contains: @@ -307,29 +307,29 @@ start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) -> [Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit], [{timeout, infinity}]). -write(Server, MsgId, Msg, CState = +write(Server, Guid, Msg, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> - ok = add_to_cache(CurFileCacheEts, MsgId, Msg), - {gen_server2:cast(Server, {write, MsgId, Msg}), CState}. + ok = add_to_cache(CurFileCacheEts, Guid, Msg), + {gen_server2:cast(Server, {write, Guid, Msg}), CState}. -read(Server, MsgId, CState = +read(Server, Guid, CState = #client_msstate { dedup_cache_ets = DedupCacheEts, cur_file_cache_ets = CurFileCacheEts }) -> %% 1. Check the dedup cache - case fetch_and_increment_cache(DedupCacheEts, MsgId) of + case fetch_and_increment_cache(DedupCacheEts, Guid) of not_found -> %% 2. Check the cur file cache - case ets:lookup(CurFileCacheEts, MsgId) of + case ets:lookup(CurFileCacheEts, Guid) of [] -> Defer = fun() -> {gen_server2:pcall( - Server, 2, {read, MsgId}, infinity), + Server, 2, {read, Guid}, infinity), CState} end, - case index_lookup(MsgId, CState) of + case index_lookup(Guid, CState) of not_found -> Defer(); MsgLocation -> client_read1(Server, MsgLocation, Defer, CState) end; - [{MsgId, Msg, _CacheRefCount}] -> + [{Guid, Msg, _CacheRefCount}] -> %% Although we've found it, we don't know the %% refcount, so can't insert into dedup cache {{ok, Msg}, CState} @@ -338,10 +338,10 @@ read(Server, MsgId, CState = {{ok, Msg}, CState} end. -contains(Server, MsgId) -> gen_server2:call(Server, {contains, MsgId}, infinity). -remove(Server, MsgIds) -> gen_server2:cast(Server, {remove, MsgIds}). -release(Server, MsgIds) -> gen_server2:cast(Server, {release, MsgIds}). -sync(Server, MsgIds, K) -> gen_server2:cast(Server, {sync, MsgIds, K}). +contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity). +remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}). +release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}). +sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}). sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal gc_done(Server, Reclaimed, Source, Destination) -> @@ -381,37 +381,37 @@ clean(Server, BaseDir) -> %% Client-side-only helpers %%---------------------------------------------------------------------------- -add_to_cache(CurFileCacheEts, MsgId, Msg) -> - case ets:insert_new(CurFileCacheEts, {MsgId, Msg, 1}) of +add_to_cache(CurFileCacheEts, Guid, Msg) -> + case ets:insert_new(CurFileCacheEts, {Guid, Msg, 1}) of true -> ok; false -> try - ets:update_counter(CurFileCacheEts, MsgId, {3, +1}), + ets:update_counter(CurFileCacheEts, Guid, {3, +1}), ok - catch error:badarg -> add_to_cache(CurFileCacheEts, MsgId, Msg) + catch error:badarg -> add_to_cache(CurFileCacheEts, Guid, Msg) end end. -client_read1(Server, #msg_location { guid = MsgId, file = File } = +client_read1(Server, #msg_location { guid = Guid, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of [] -> %% File has been GC'd and no longer exists. Go around again. - read(Server, MsgId, CState); + read(Server, Guid, CState); [#file_summary { locked = Locked, right = Right }] -> client_read2(Server, Locked, Right, MsgLocation, Defer, CState) end. client_read2(_Server, false, undefined, - #msg_location { guid = MsgId, ref_count = RefCount }, Defer, + #msg_location { guid = Guid, ref_count = RefCount }, Defer, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, dedup_cache_ets = DedupCacheEts }) -> - case ets:lookup(CurFileCacheEts, MsgId) of + case ets:lookup(CurFileCacheEts, Guid) of [] -> Defer(); %% may have rolled over - [{MsgId, Msg, _CacheRefCount}] -> - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), + [{Guid, Msg, _CacheRefCount}] -> + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {{ok, Msg}, CState} end; client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> @@ -420,7 +420,7 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) -> %% the safest and simplest thing to do. Defer(); client_read2(Server, false, _Right, - #msg_location { guid = MsgId, ref_count = RefCount, file = File }, + #msg_location { guid = Guid, ref_count = RefCount, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -430,7 +430,7 @@ client_read2(Server, false, _Right, %% finished. try ets:update_counter(FileSummaryEts, File, {#file_summary.readers, +1}) catch error:badarg -> %% the File has been GC'd and deleted. Go around. - read(Server, MsgId, CState) + read(Server, Guid, CState) end, Release = fun() -> ets:update_counter(FileSummaryEts, File, {#file_summary.readers, -1}) @@ -452,7 +452,7 @@ client_read2(Server, false, _Right, %% readers, msg_store ets:deletes (and unlocks the dest) try Release(), Defer() - catch error:badarg -> read(Server, MsgId, CState) + catch error:badarg -> read(Server, Guid, CState) end; false -> %% Ok, we're definitely safe to continue - a GC can't @@ -468,7 +468,7 @@ client_read2(Server, false, _Right, %% badarg scenario above, but we don't have a missing file %% - we just have the /wrong/ file). - case index_lookup(MsgId, CState) of + case index_lookup(Guid, CState) of MsgLocation = #msg_location { file = File } -> %% Still the same file. %% This is fine to fail (already exists) @@ -476,7 +476,7 @@ client_read2(Server, false, _Right, CState1 = close_all_indicated(CState), {Msg, CState2} = read_from_disk(MsgLocation, CState1, DedupCacheEts), - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), Release(), %% this MUST NOT fail with badarg {{ok, Msg}, CState2}; @@ -589,12 +589,12 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) -> gc_pid = GCPid }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. -handle_call({read, MsgId}, From, State) -> - State1 = read_message(MsgId, From, State), +handle_call({read, Guid}, From, State) -> + State1 = read_message(Guid, From, State), noreply(State1); -handle_call({contains, MsgId}, From, State) -> - State1 = contains_message(MsgId, From, State), +handle_call({contains, Guid}, From, State) -> + State1 = contains_message(Guid, From, State), noreply(State1); handle_call({new_client_state, CRef}, _From, @@ -617,21 +617,21 @@ handle_call({delete_client, CRef}, _From, reply(ok, State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }). -handle_cast({write, MsgId, Msg}, +handle_cast({write, Guid, Msg}, State = #msstate { current_file_handle = CurHdl, current_file = CurFile, sum_valid_data = SumValid, sum_file_size = SumFileSize, file_summary_ets = FileSummaryEts, cur_file_cache_ets = CurFileCacheEts }) -> - true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - case index_lookup(MsgId, State) of + true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}), + case index_lookup(Guid, State) of not_found -> %% New message, lots to do {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), - {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg), + {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), ok = index_insert(#msg_location { - guid = MsgId, ref_count = 1, file = CurFile, + guid = Guid, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, State), [#file_summary { valid_total_size = ValidTotalSize, @@ -661,34 +661,34 @@ handle_cast({write, MsgId, Msg}, #msg_location { ref_count = RefCount } -> %% We already know about it, just update counter. Only %% update field otherwise bad interaction with concurrent GC - ok = index_update_fields(MsgId, + ok = index_update_fields(Guid, {#msg_location.ref_count, RefCount + 1}, State), noreply(State) end; -handle_cast({remove, MsgIds}, State) -> +handle_cast({remove, Guids}, State) -> State1 = lists:foldl( - fun (MsgId, State2) -> remove_message(MsgId, State2) end, - State, MsgIds), + fun (Guid, State2) -> remove_message(Guid, State2) end, + State, Guids), noreply(maybe_compact(State1)); -handle_cast({release, MsgIds}, State = +handle_cast({release, Guids}, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> lists:foreach( - fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds), + fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids), noreply(State); -handle_cast({sync, MsgIds, K}, +handle_cast({sync, Guids, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, on_sync = Syncs }) -> {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (MsgId) -> + case lists:any(fun (Guid) -> #msg_location { file = File, offset = Offset } = - index_lookup(MsgId, State), + index_lookup(Guid, State), File =:= CurFile andalso Offset >= SyncOffset - end, MsgIds) of + end, Guids) of false -> K(), noreply(State); true -> noreply(State #msstate { on_sync = [K | Syncs] }) @@ -821,13 +821,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, State1 #msstate { on_sync = [] } end. -read_message(MsgId, From, State = +read_message(Guid, From, State = #msstate { dedup_cache_ets = DedupCacheEts }) -> - case index_lookup(MsgId, State) of + case index_lookup(Guid, State) of not_found -> gen_server2:reply(From, not_found), State; MsgLocation -> - case fetch_and_increment_cache(DedupCacheEts, MsgId) of + case fetch_and_increment_cache(DedupCacheEts, Guid) of not_found -> read_message1(From, MsgLocation, State); Msg -> @@ -836,7 +836,7 @@ read_message(MsgId, From, State = end end. -read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, +read_message1(From, #msg_location { guid = Guid, ref_count = RefCount, file = File, offset = Offset } = MsgLoc, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, @@ -847,7 +847,7 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, true -> {Msg, State1} = %% can return [] if msg in file existed on startup - case ets:lookup(CurFileCacheEts, MsgId) of + case ets:lookup(CurFileCacheEts, Guid) of [] -> ok = case {ok, Offset} >= file_handle_cache:current_raw_offset(CurHdl) of @@ -855,10 +855,10 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, false -> ok end, read_from_disk(MsgLoc, State, DedupCacheEts); - [{MsgId, Msg1, _CacheRefCount}] -> + [{Guid, Msg1, _CacheRefCount}] -> {Msg1, State} end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), gen_server2:reply(From, {ok, Msg}), State1; false -> @@ -866,7 +866,7 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, ets:lookup(FileSummaryEts, File), case Locked of true -> - add_to_pending_gc_completion({read, MsgId, From}, State); + add_to_pending_gc_completion({read, Guid, From}, State); false -> {Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts), gen_server2:reply(From, {ok, Msg}), @@ -874,36 +874,36 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount, end end. -read_from_disk(#msg_location { guid = MsgId, ref_count = RefCount, +read_from_disk(#msg_location { guid = Guid, ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize }, State, DedupCacheEts) -> {Hdl, State1} = get_read_handle(File, State), {ok, Offset} = file_handle_cache:position(Hdl, Offset), - {ok, {MsgId, Msg}} = + {ok, {Guid, Msg}} = case rabbit_msg_file:read(Hdl, TotalSize) of - {ok, {MsgId, _}} = Obj -> + {ok, {Guid, _}} = Obj -> Obj; Rest -> throw({error, {misread, [{old_state, State}, {file_num, File}, {offset, Offset}, - {guid, MsgId}, + {guid, Guid}, {read, Rest}, {proc_dict, get()} ]}}) end, - ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg), + ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg), {Msg, State1}. -maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg) +maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg) when RefCount > 1 -> - insert_into_cache(DedupCacheEts, MsgId, Msg); -maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) -> + insert_into_cache(DedupCacheEts, Guid, Msg); +maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) -> ok. -contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> - case index_lookup(MsgId, State) of +contains_message(Guid, From, State = #msstate { gc_active = GCActive }) -> + case index_lookup(Guid, State) of not_found -> gen_server2:reply(From, false), State; @@ -911,34 +911,34 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) -> case GCActive of {A, B} when File == A orelse File == B -> add_to_pending_gc_completion( - {contains, MsgId, From}, State); + {contains, Guid, From}, State); _ -> gen_server2:reply(From, true), State end end. -remove_message(MsgId, State = #msstate { sum_valid_data = SumValid, +remove_message(Guid, State = #msstate { sum_valid_data = SumValid, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts }) -> #msg_location { ref_count = RefCount, file = File, offset = Offset, total_size = TotalSize } = - index_lookup(MsgId, State), + index_lookup(Guid, State), case RefCount of 1 -> %% don't remove from CUR_FILE_CACHE_ETS_NAME here because %% there may be further writes in the mailbox for the same %% msg. - ok = remove_cache_entry(DedupCacheEts, MsgId), + ok = remove_cache_entry(DedupCacheEts, Guid), [#file_summary { valid_total_size = ValidTotalSize, contiguous_top = ContiguousTop, locked = Locked }] = ets:lookup(FileSummaryEts, File), case Locked of true -> - add_to_pending_gc_completion({remove, MsgId}, State); + add_to_pending_gc_completion({remove, Guid}, State); false -> - ok = index_delete(MsgId, State), + ok = index_delete(Guid, State), ContiguousTop1 = lists:min([ContiguousTop, Offset]), ValidTotalSize1 = ValidTotalSize - TotalSize, true = ets:update_element( @@ -949,9 +949,9 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid, State1 #msstate { sum_valid_data = SumValid - TotalSize } end; _ when 1 < RefCount -> - ok = decrement_cache(DedupCacheEts, MsgId), + ok = decrement_cache(DedupCacheEts, Guid), %% only update field, otherwise bad interaction with concurrent GC - ok = index_update_fields(MsgId, + ok = index_update_fields(Guid, {#msg_location.ref_count, RefCount - 1}, State), State @@ -967,12 +967,12 @@ run_pending(State = #msstate { pending_gc_completion = Pending }) -> State1 = State #msstate { pending_gc_completion = [] }, lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)). -run_pending({read, MsgId, From}, State) -> - read_message(MsgId, From, State); -run_pending({contains, MsgId, From}, State) -> - contains_message(MsgId, From, State); -run_pending({remove, MsgId}, State) -> - remove_message(MsgId, State). +run_pending({read, Guid, From}, State) -> + read_message(Guid, From, State); +run_pending({contains, Guid, From}, State) -> + contains_message(Guid, From, State); +run_pending({remove, Guid}, State) -> + remove_message(Guid, State). open_file(Dir, FileName, Mode) -> file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode, @@ -1091,45 +1091,45 @@ scan_file_for_valid_messages(Dir, FileName) -> %% message cache helper functions %%---------------------------------------------------------------------------- -remove_cache_entry(DedupCacheEts, MsgId) -> - true = ets:delete(DedupCacheEts, MsgId), +remove_cache_entry(DedupCacheEts, Guid) -> + true = ets:delete(DedupCacheEts, Guid), ok. -fetch_and_increment_cache(DedupCacheEts, MsgId) -> - case ets:lookup(DedupCacheEts, MsgId) of +fetch_and_increment_cache(DedupCacheEts, Guid) -> + case ets:lookup(DedupCacheEts, Guid) of [] -> not_found; - [{_MsgId, Msg, _RefCount}] -> + [{_Guid, Msg, _RefCount}] -> try - ets:update_counter(DedupCacheEts, MsgId, {3, 1}) + ets:update_counter(DedupCacheEts, Guid, {3, 1}) catch error:badarg -> %% someone has deleted us in the meantime, insert us - ok = insert_into_cache(DedupCacheEts, MsgId, Msg) + ok = insert_into_cache(DedupCacheEts, Guid, Msg) end, Msg end. -decrement_cache(DedupCacheEts, MsgId) -> - true = try case ets:update_counter(DedupCacheEts, MsgId, {3, -1}) of - N when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId); +decrement_cache(DedupCacheEts, Guid) -> + true = try case ets:update_counter(DedupCacheEts, Guid, {3, -1}) of + N when N =< 0 -> true = ets:delete(DedupCacheEts, Guid); _N -> true end catch error:badarg -> - %% MsgId is not in there because although it's been + %% Guid is not in there because although it's been %% delivered, it's never actually been read (think: %% persistent message held in RAM) true end, ok. -insert_into_cache(DedupCacheEts, MsgId, Msg) -> - case ets:insert_new(DedupCacheEts, {MsgId, Msg, 1}) of +insert_into_cache(DedupCacheEts, Guid, Msg) -> + case ets:insert_new(DedupCacheEts, {Guid, Msg, 1}) of true -> ok; false -> try - ets:update_counter(DedupCacheEts, MsgId, {3, 1}), + ets:update_counter(DedupCacheEts, Guid, {3, 1}), ok catch error:badarg -> - insert_into_cache(DedupCacheEts, MsgId, Msg) + insert_into_cache(DedupCacheEts, Guid, Msg) end end. @@ -1172,17 +1172,17 @@ count_msg_refs(true, _Gen, _Seed, _State) -> count_msg_refs(Gen, Seed, State) -> case Gen(Seed) of finished -> ok; - {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State); - {MsgId, Delta, Next} -> - ok = case index_lookup(MsgId, State) of + {_Guid, 0, Next} -> count_msg_refs(Gen, Next, State); + {Guid, Delta, Next} -> + ok = case index_lookup(Guid, State) of not_found -> - index_insert(#msg_location { guid = MsgId, + index_insert(#msg_location { guid = Guid, ref_count = Delta }, State); StoreEntry = #msg_location { ref_count = RefCount } -> NewRefCount = RefCount + Delta, case NewRefCount of - 0 -> index_delete(MsgId, State); + 0 -> index_delete(Guid, State); _ -> index_update(StoreEntry #msg_location { ref_count = NewRefCount }, State) @@ -1201,9 +1201,9 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) -> recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION, true = lists:member(NonTmpRelatedFileName, FileNames), - {ok, UncorruptedMessagesTmp, MsgIdsTmp} = + {ok, UncorruptedMessagesTmp, GuidsTmp} = scan_file_for_valid_messages_guids(Dir, TmpFileName), - {ok, UncorruptedMessages, MsgIds} = + {ok, UncorruptedMessages, Guids} = scan_file_for_valid_messages_guids(Dir, NonTmpRelatedFileName), %% 1) It's possible that everything in the tmp file is also in the %% main file such that the main file is (prefix ++ @@ -1232,7 +1232,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% back to before any of the files in the tmp file and copy %% them over again TmpPath = form_filename(Dir, TmpFileName), - case is_sublist(MsgIdsTmp, MsgIds) of + case is_sublist(GuidsTmp, Guids) of true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file %% note this also catches the case when the tmp file %% is empty @@ -1243,13 +1243,13 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> %% there are no msgs in the tmp file then we would be in %% the 'true' branch of this case, so we know the %% lists:last call is safe. - EldestTmpMsgId = lists:last(MsgIdsTmp), - {MsgIds1, UncorruptedMessages1} + EldestTmpGuid = lists:last(GuidsTmp), + {Guids1, UncorruptedMessages1} = case lists:splitwith( - fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of - {_MsgIds, []} -> %% no msgs from tmp in main - {MsgIds, UncorruptedMessages}; - {Dropped, [EldestTmpMsgId | Rest]} -> + fun (Guid) -> Guid /= EldestTmpGuid end, Guids) of + {_Guids, []} -> %% no msgs from tmp in main + {Guids, UncorruptedMessages}; + {Dropped, [EldestTmpGuid | Rest]} -> %% Msgs in Dropped are in tmp, so forget them. %% *cry*. Lists indexed from 1. {Rest, lists:sublist(UncorruptedMessages, @@ -1257,11 +1257,11 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> length(Rest))} end, %% The main file prefix should be contiguous - {Top, MsgIds1} = find_contiguous_block_prefix( + {Top, Guids1} = find_contiguous_block_prefix( lists:reverse(UncorruptedMessages1)), %% we should have that none of the messages in the prefix %% are in the tmp file - true = is_disjoint(MsgIds1, MsgIdsTmp), + true = is_disjoint(Guids1, GuidsTmp), %% must open with read flag, otherwise will stomp over contents {ok, MainHdl} = open_file( Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]), @@ -1281,13 +1281,13 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) -> ok = file_handle_cache:close(MainHdl), ok = file_handle_cache:delete(TmpHdl), - {ok, _MainMessages, MsgIdsMain} = + {ok, _MainMessages, GuidsMain} = scan_file_for_valid_messages_guids( Dir, NonTmpRelatedFileName), - %% check that everything in MsgIds1 is in MsgIdsMain - true = is_sublist(MsgIds1, MsgIdsMain), - %% check that everything in MsgIdsTmp is in MsgIdsMain - true = is_sublist(MsgIdsTmp, MsgIdsMain) + %% check that everything in Guids1 is in GuidsMain + true = is_sublist(Guids1, GuidsMain), + %% check that everything in GuidsTmp is in GuidsMain + true = is_sublist(GuidsTmp, GuidsMain) end, ok. @@ -1300,7 +1300,7 @@ is_disjoint(SmallerL, BiggerL) -> scan_file_for_valid_messages_guids(Dir, FileName) -> {ok, Messages, _FileSize} = scan_file_for_valid_messages(Dir, FileName), - {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}. + {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}. %% Takes the list in *ascending* order (i.e. eldest message %% first). This is the opposite of what scan_file_for_valid_messages @@ -1309,14 +1309,14 @@ find_contiguous_block_prefix([]) -> {0, []}; find_contiguous_block_prefix(List) -> find_contiguous_block_prefix(List, 0, []). -find_contiguous_block_prefix([], ExpectedOffset, MsgIds) -> - {ExpectedOffset, MsgIds}; -find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail], - ExpectedOffset, MsgIds) -> +find_contiguous_block_prefix([], ExpectedOffset, Guids) -> + {ExpectedOffset, Guids}; +find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail], + ExpectedOffset, Guids) -> ExpectedOffset1 = ExpectedOffset + TotalSize, - find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]); -find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) -> - {ExpectedOffset, MsgIds}. + find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]); +find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) -> + {ExpectedOffset, Guids}. build_index(true, _Files, State = #msstate { file_summary_ets = FileSummaryEts }) -> @@ -1373,8 +1373,8 @@ build_index_worker( Dir, filenum_to_name(File)), {ValidMessages, ValidTotalSize} = lists:foldl( - fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) -> - case index_lookup(MsgId, State) of + fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) -> + case index_lookup(Guid, State) of not_found -> {VMAcc, VTSAcc}; StoreEntry -> ok = index_update(StoreEntry #msg_location { @@ -1394,7 +1394,7 @@ build_index_worker( %% file size. [] -> {undefined, case ValidMessages of [] -> 0; - _ -> {_MsgId, TotalSize, Offset} = + _ -> {_Guid, TotalSize, Offset} = lists:last(ValidMessages), Offset + TotalSize end}; @@ -1649,8 +1649,8 @@ find_unremoved_messages_in_file(File, Dir, filenum_to_name(File)), %% foldl will reverse so will end up with msgs in ascending offset order lists:foldl( - fun ({MsgId, _TotalSize, _Offset}, Acc) -> - case Index:lookup(MsgId, IndexState) of + fun ({Guid, _TotalSize, _Offset}, Acc) -> + case Index:lookup(Guid, IndexState) of Entry = #msg_location { file = File } -> [ Entry | Acc ]; _ -> Acc end @@ -1660,13 +1660,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl, Destination, {_FileSummaryEts, _Dir, Index, IndexState}) -> {FinalOffset, BlockStart1, BlockEnd1} = lists:foldl( - fun (#msg_location { guid = MsgId, offset = Offset, + fun (#msg_location { guid = Guid, offset = Offset, total_size = TotalSize }, {CurOffset, BlockStart, BlockEnd}) -> %% CurOffset is in the DestinationFile. %% Offset, BlockStart and BlockEnd are in the SourceFile %% update MsgLocation to reflect change of file and offset - ok = Index:update_fields(MsgId, + ok = Index:update_fields(Guid, [{#msg_location.file, Destination}, {#msg_location.offset, CurOffset}], IndexState), diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 556c6968e4..2d9b66738f 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -103,7 +103,7 @@ %% and seeding the message store on start up. %% Note that in general, the representation of a message's state as -%% the tuple: {('no_pub'|{MsgId, IsPersistent}), ('del'|'no_del'), +%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'), %% ('ack'|'no_ack')} is richer than strictly necessary for most %% operations. However, for startup, and to ensure the safe and %% correct combination of journal entries with entries read from the @@ -265,12 +265,12 @@ init(Name, MsgStoreRecovered) -> Segment2 = #segment { pubs = PubCount1, acks = AckCount1 } = array:sparse_foldl( - fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack}, + fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack}, Segment3) -> Segment4 = maybe_add_to_journal( rabbit_msg_store:contains( - ?PERSISTENT_MSG_STORE, MsgId), + ?PERSISTENT_MSG_STORE, Guid), CleanShutdown, Del, RelSeq, Segment3), Segment4 end, Segment1 #segment { pubs = PubCount, @@ -327,15 +327,15 @@ terminate_and_erase(State) -> ok = delete_queue_directory(State1 #qistate.dir), State1. -write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) -> - ?GUID_BYTES = size(MsgId), +write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) -> + ?GUID_BYTES = size(Guid), {JournalHdl, State1} = get_journal_handle(State), ok = file_handle_cache:append( JournalHdl, [<<(case IsPersistent of true -> ?PUB_PERSIST_JPREFIX; false -> ?PUB_TRANS_JPREFIX - end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId]), - maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)). + end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]), + maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)). write_delivered(SeqId, State) -> {JournalHdl, State1} = get_journal_handle(State), @@ -396,8 +396,8 @@ read_segment_entries(InitSeqId, State = #qistate { segments = Segments, {SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment), #segment { journal_entries = JEntries } = Segment1, {array:sparse_foldr( - fun (RelSeq, {{MsgId, IsPersistent}, IsDelivered, no_ack}, Acc) -> - [ {MsgId, reconstruct_seq_id(Seg, RelSeq), + fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) -> + [ {Guid, reconstruct_seq_id(Seg, RelSeq), IsPersistent, IsDelivered == del} | Acc ] end, [], journal_plus_segment(JEntries, SegEntries)), State #qistate { segments = segment_store(Segment1, Segments) }}. @@ -492,7 +492,7 @@ queue_index_walker({[], Gatherer}) -> case gatherer:fetch(Gatherer) of finished -> rabbit_misc:unlink_and_capture_exit(Gatherer), finished; - {value, {MsgId, Count}} -> {MsgId, Count, {[], Gatherer}} + {value, {Guid, Count}} -> {Guid, Count, {[], Gatherer}} end; queue_index_walker({[QueueName | QueueNames], Gatherer}) -> Child = make_ref(), @@ -519,9 +519,9 @@ queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) -> queue_index_walker_reader1(_Gatherer, State, []) -> State; queue_index_walker_reader1( - Gatherer, State, [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> + Gatherer, State, [{Guid, _SeqId, IsPersistent, _IsDelivered} | Msgs]) -> case IsPersistent of - true -> gatherer:produce(Gatherer, {MsgId, 1}); + true -> gatherer:produce(Gatherer, {Guid, 1}); false -> ok end, queue_index_walker_reader1(Gatherer, State, Msgs). @@ -684,17 +684,17 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) -> bool_to_int(true ) -> 1; bool_to_int(false) -> 0. -write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) -> +write_entry_to_segment(_RelSeq, {{_Guid, _IsPersistent}, del, ack}, Hdl) -> Hdl; write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) -> ok = case Pub of no_pub -> ok; - {MsgId, IsPersistent} -> + {Guid, IsPersistent} -> file_handle_cache:append( Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS, (bool_to_int(IsPersistent)):1, - RelSeq:?REL_SEQ_BITS>>, MsgId]) + RelSeq:?REL_SEQ_BITS>>, Guid]) end, ok = case {Del, Ack} of {no_del, no_ack} -> @@ -775,10 +775,10 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) -> IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} -> %% because we specify /binary, and binaries are complete %% bytes, the size spec is in bytes, not bits. - {ok, MsgId} = file_handle_cache:read(Hdl, ?GUID_BYTES), + {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES), SegEntries1 = array:set(RelSeq, - {{MsgId, 1 == IsPersistentNum}, no_del, no_ack}, + {{Guid, 1 == IsPersistentNum}, no_del, no_ack}, SegEntries), load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1, AckCount); @@ -837,13 +837,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) -> load_journal_entries(add_to_journal(SeqId, ack, State)); _ -> case file_handle_cache:read(Hdl, ?GUID_BYTES) of - {ok, <<MsgIdNum:?GUID_BITS>>} -> + {ok, <<GuidNum:?GUID_BITS>>} -> %% work around for binary data %% fragmentation. See %% rabbit_msg_file:read_next/2 - <<MsgId:?GUID_BYTES/binary>> = - <<MsgIdNum:?GUID_BITS>>, - Publish = {MsgId, case Prefix of + <<Guid:?GUID_BYTES/binary>> = + <<GuidNum:?GUID_BITS>>, + Publish = {Guid, case Prefix of ?PUB_PERSIST_JPREFIX -> true; ?PUB_TRANS_JPREFIX -> false end}, @@ -873,7 +873,7 @@ add_to_journal(RelSeq, Action, case Action of del -> Segment1; ack -> Segment1 #segment { acks = AckCount + 1 }; - {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } + {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 } end; %% This is a more relaxed version of deliver_or_ack_msg because we can @@ -912,30 +912,30 @@ journal_plus_segment(JEntries, SegEntries) -> %% Here, the Out is the Seg Array which we may be adding to (for %% items only in the journal), modifying (bits in both), or erasing %% from (ack in journal, not segment). -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, +journal_plus_segment(Obj = {{_Guid, _IsPersistent}, no_del, no_ack}, not_found, RelSeq, Out) -> array:set(RelSeq, Obj, Out); -journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, +journal_plus_segment(Obj = {{_Guid, _IsPersistent}, del, no_ack}, not_found, RelSeq, Out) -> array:set(RelSeq, Obj, Out); -journal_plus_segment({{_MsgId, _IsPersistent}, del, ack}, +journal_plus_segment({{_Guid, _IsPersistent}, del, ack}, not_found, RelSeq, Out) -> array:reset(RelSeq, Out); journal_plus_segment({no_pub, del, no_ack}, - {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, + {Pub = {_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out) -> array:set(RelSeq, {Pub, del, no_ack}, Out); journal_plus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, + {{_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out) -> array:reset(RelSeq, Out); journal_plus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, + {{_Guid, _IsPersistent}, del, no_ack}, RelSeq, Out) -> array:reset(RelSeq, Out). @@ -958,77 +958,77 @@ journal_minus_segment(JEntries, SegEntries) -> %% publish or ack is in both the journal and the segment. %% Both the same. Must be at least the publish -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack}, +journal_minus_segment(Obj, Obj = {{_Guid, _IsPersistent}, _Del, no_ack}, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved + 1, AcksRemoved}; -journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack}, +journal_minus_segment(Obj, Obj = {{_Guid, _IsPersistent}, _Del, ack}, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved + 1, AcksRemoved + 1}; %% Just publish in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack}, +journal_minus_segment(Obj = {{_Guid, _IsPersistent}, no_del, no_ack}, not_found, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; %% Just deliver in journal journal_minus_segment(Obj = {no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, + {{_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, no_ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, + {{_Guid, _IsPersistent}, del, no_ack}, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved, AcksRemoved}; %% Just ack in journal journal_minus_segment(Obj = {no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, + {{_Guid, _IsPersistent}, del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, no_del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, + {{_Guid, _IsPersistent}, del, ack}, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved, AcksRemoved}; %% Publish and deliver in journal -journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack}, +journal_minus_segment(Obj = {{_Guid, _IsPersistent}, del, no_ack}, not_found, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({Pub, del, no_ack}, - {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, + {Pub = {_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, del, no_ack}, Out), PubsRemoved + 1, AcksRemoved}; %% Deliver and ack in journal journal_minus_segment(Obj = {no_pub, del, ack}, - {{_MsgId, _IsPersistent}, no_del, no_ack}, + {{_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, no_ack}, + {{_Guid, _IsPersistent}, del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, no_del, ack}, Out), PubsRemoved, AcksRemoved}; journal_minus_segment({no_pub, del, ack}, - {{_MsgId, _IsPersistent}, del, ack}, + {{_Guid, _IsPersistent}, del, ack}, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved, AcksRemoved + 1}; %% Publish, deliver and ack in journal -journal_minus_segment({{_MsgId, _IsPersistent}, del, ack}, +journal_minus_segment({{_Guid, _IsPersistent}, del, ack}, not_found, _RelSeq, Out, PubsRemoved, AcksRemoved) -> {Out, PubsRemoved, AcksRemoved}; journal_minus_segment({Pub, del, ack}, - {Pub = {_MsgId, _IsPersistent}, no_del, no_ack}, + {Pub = {_Guid, _IsPersistent}, no_del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, del, ack}, Out), PubsRemoved + 1, AcksRemoved}; journal_minus_segment({Pub, del, ack}, - {Pub = {_MsgId, _IsPersistent}, del, no_ack}, + {Pub = {_Guid, _IsPersistent}, del, no_ack}, RelSeq, Out, PubsRemoved, AcksRemoved) -> {array:set(RelSeq, {no_pub, no_del, ack}, Out), PubsRemoved + 1, AcksRemoved}. diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index 8eb129394d..66f2d3cc3f 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1022,170 +1022,170 @@ stop_msg_store() -> guid_bin(X) -> erlang:md5(term_to_binary(X)). -msg_store_contains(Atom, MsgIds) -> +msg_store_contains(Atom, Guids) -> Atom = lists:foldl( - fun (MsgId, Atom1) when Atom1 =:= Atom -> - rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId) end, - Atom, MsgIds). + fun (Guid, Atom1) when Atom1 =:= Atom -> + rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, + Atom, Guids). -msg_store_sync(MsgIds) -> +msg_store_sync(Guids) -> Ref = make_ref(), Self = self(), - ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, MsgIds, + ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids, fun () -> Self ! {sync, Ref} end), receive {sync, Ref} -> ok after 10000 -> - io:format("Sync from msg_store missing for guids ~p~n", [MsgIds]), + io:format("Sync from msg_store missing for guids ~p~n", [Guids]), throw(timeout) end. -msg_store_read(MsgIds, MSCState) -> +msg_store_read(Guids, MSCState) -> lists:foldl( - fun (MsgId, MSCStateM) -> - {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( - ?PERSISTENT_MSG_STORE, MsgId, MSCStateM), + fun (Guid, MSCStateM) -> + {{ok, Guid}, MSCStateN} = rabbit_msg_store:read( + ?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN end, - MSCState, MsgIds). + MSCState, Guids). -msg_store_write(MsgIds, MSCState) -> +msg_store_write(Guids, MSCState) -> lists:foldl( - fun (MsgId, {ok, MSCStateN}) -> - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, MsgId, MSCStateN) end, - {ok, MSCState}, MsgIds). + fun (Guid, {ok, MSCStateN}) -> + rabbit_msg_store:write(?PERSISTENT_MSG_STORE, Guid, Guid, MSCStateN) end, + {ok, MSCState}, Guids). test_msg_store() -> stop_msg_store(), ok = start_msg_store_empty(), Self = self(), - MsgIds = [guid_bin(M) || M <- lists:seq(1,100)], - {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds), + Guids = [guid_bin(M) || M <- lists:seq(1,100)], + {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), %% check we don't contain any of the msgs we're about to publish - false = msg_store_contains(false, MsgIds), + false = msg_store_contains(false, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), %% publish the first half - {ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState), + {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState), %% sync on the first half - ok = msg_store_sync(MsgIds1stHalf), + ok = msg_store_sync(Guids1stHalf), %% publish the second half - {ok, MSCState2} = msg_store_write(MsgIds2ndHalf, MSCState1), + {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1), %% sync on the first half again - the msg_store will be dirty, but %% we won't need the fsync - ok = msg_store_sync(MsgIds1stHalf), + ok = msg_store_sync(Guids1stHalf), %% check they're all in there - true = msg_store_contains(true, MsgIds), + true = msg_store_contains(true, Guids), %% publish the latter half twice so we hit the caching and ref count code - {ok, MSCState3} = msg_store_write(MsgIds2ndHalf, MSCState2), + {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2), %% check they're still all in there - true = msg_store_contains(true, MsgIds), + true = msg_store_contains(true, Guids), %% sync on the 2nd half, but do lots of individual syncs to try %% and cause coalescing to happen ok = lists:foldl( - fun (MsgId, ok) -> rabbit_msg_store:sync( + fun (Guid, ok) -> rabbit_msg_store:sync( ?PERSISTENT_MSG_STORE, - [MsgId], fun () -> Self ! {sync, MsgId} end) - end, ok, MsgIds2ndHalf), + [Guid], fun () -> Self ! {sync, Guid} end) + end, ok, Guids2ndHalf), lists:foldl( - fun(MsgId, ok) -> + fun(Guid, ok) -> receive - {sync, MsgId} -> ok + {sync, Guid} -> ok after 10000 -> io:format("Sync from msg_store missing (guid: ~p)~n", - [MsgId]), + [Guid]), throw(timeout) end - end, ok, MsgIds2ndHalf), + end, ok, Guids2ndHalf), %% it's very likely we're not dirty here, so the 1st half sync %% should hit a different code path - ok = msg_store_sync(MsgIds1stHalf), + ok = msg_store_sync(Guids1stHalf), %% read them all - MSCState4 = msg_store_read(MsgIds, MSCState3), + MSCState4 = msg_store_read(Guids, MSCState3), %% read them all again - this will hit the cache, not disk - MSCState5 = msg_store_read(MsgIds, MSCState4), + MSCState5 = msg_store_read(Guids, MSCState4), %% remove them all - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds), + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids), %% check first half doesn't exist - false = msg_store_contains(false, MsgIds1stHalf), + false = msg_store_contains(false, Guids1stHalf), %% check second half does exist - true = msg_store_contains(true, MsgIds2ndHalf), + true = msg_store_contains(true, Guids2ndHalf), %% read the second half again - MSCState6 = msg_store_read(MsgIds2ndHalf, MSCState5), + MSCState6 = msg_store_read(Guids2ndHalf, MSCState5), %% release the second half, just for fun (aka code coverage) - ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIds2ndHalf), + ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf), %% read the second half again, just for fun (aka code coverage) - MSCState7 = msg_store_read(MsgIds2ndHalf, MSCState6), + MSCState7 = msg_store_read(Guids2ndHalf, MSCState6), ok = rabbit_msg_store:client_terminate(MSCState7), %% stop and restart, preserving every other msg in 2nd half ok = stop_msg_store(), ok = start_msg_store(fun ([]) -> finished; - ([MsgId|MsgIdsTail]) - when length(MsgIdsTail) rem 2 == 0 -> - {MsgId, 1, MsgIdsTail}; - ([MsgId|MsgIdsTail]) -> - {MsgId, 0, MsgIdsTail} - end, MsgIds2ndHalf), + ([Guid|GuidsTail]) + when length(GuidsTail) rem 2 == 0 -> + {Guid, 1, GuidsTail}; + ([Guid|GuidsTail]) -> + {Guid, 0, GuidsTail} + end, Guids2ndHalf), %% check we have the right msgs left lists:foldl( - fun (MsgId, Bool) -> - not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId)) - end, false, MsgIds2ndHalf), + fun (Guid, Bool) -> + not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid)) + end, false, Guids2ndHalf), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% check we don't contain any of the msgs - false = msg_store_contains(false, MsgIds), + false = msg_store_contains(false, Guids), %% publish the first half again MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), - {ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8), + {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8), %% this should force some sort of sync internally otherwise misread ok = rabbit_msg_store:client_terminate( - msg_store_read(MsgIds1stHalf, MSCState9)), - ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf), + msg_store_read(Guids1stHalf, MSCState9)), + ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), %% now safe to reuse guids %% push a lot of msgs in... BigCount = 100000, - MsgIdsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], + GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)], Payload = << 0:65536 >>, ok = rabbit_msg_store:client_terminate( lists:foldl( - fun (MsgId, MSCStateN) -> + fun (Guid, MSCStateN) -> {ok, MSCStateM} = - rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN), + rabbit_msg_store:write(?PERSISTENT_MSG_STORE, Guid, Payload, MSCStateN), MSCStateM - end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)), + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), GuidsBig)), %% now read them to ensure we hit the fast client-side reading ok = rabbit_msg_store:client_terminate( lists:foldl( - fun (MsgId, MSCStateM) -> + fun (Guid, MSCStateM) -> {{ok, Payload}, MSCStateN} = - rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM), + rabbit_msg_store:read(?PERSISTENT_MSG_STORE, Guid, MSCStateM), MSCStateN - end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)), + end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), GuidsBig)), %% .., then 3s by 1... ok = lists:foldl( - fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) + fun (Guid, ok) -> + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)]) end, ok, lists:seq(BigCount, 1, -3)), %% .., then remove 3s by 2, from the young end first. This hits %% GC (under 50% good data left, but no empty files. Must GC). ok = lists:foldl( - fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) + fun (Guid, ok) -> + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)]) end, ok, lists:seq(BigCount-1, 1, -3)), %% .., then remove 3s by 3, from the young end first. This hits %% GC... ok = lists:foldl( - fun (MsgId, ok) -> - rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)]) + fun (Guid, ok) -> + rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)]) end, ok, lists:seq(BigCount-2, 1, -3)), %% ensure empty - false = msg_store_contains(false, MsgIdsBig), + false = msg_store_contains(false, GuidsBig), %% restart empty ok = stop_msg_store(), ok = start_msg_store_empty(), @@ -1211,13 +1211,13 @@ queue_index_publish(SeqIds, Persistent, Qi) -> end, {A, B, MSCStateEnd} = lists:foldl( - fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) -> + fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) -> Guid = rabbit_guid:guid(), QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent, QiN), {ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid, Guid, MSCStateN), - {QiM, [{SeqId, Guid} | SeqIdsMsgIdsAcc], MSCStateM} + {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM} end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds), ok = rabbit_msg_store:delete_client(MsgStore, Ref), ok = rabbit_msg_store:client_terminate(MSCStateEnd), @@ -1235,8 +1235,8 @@ queue_index_flush_journal(Qi) -> verify_read_with_published(_Delivered, _Persistent, [], _) -> ok; verify_read_with_published(Delivered, Persistent, - [{MsgId, SeqId, Persistent, Delivered}|Read], - [{SeqId, MsgId}|Published]) -> + [{Guid, SeqId, Persistent, Delivered}|Read], + [{SeqId, Guid}|Published]) -> verify_read_with_published(Delivered, Persistent, Read, Published); verify_read_with_published(_Delivered, _Persistent, _Read, _Published) -> ko. @@ -1251,12 +1251,12 @@ test_queue_index() -> {0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false), {0, 0, Qi1} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0), - {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1), + {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1), {0, SegmentSize, Qi3} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2), {ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3), ok = verify_read_with_published(false, false, ReadA, - lists:reverse(SeqIdsMsgIdsA)), + lists:reverse(SeqIdsGuidsA)), %% call terminate twice to prove it's idempotent _Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)), ok = stop_msg_store(), @@ -1265,12 +1265,12 @@ test_queue_index() -> {0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false), {0, 0, Qi7} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6), - {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7), + {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7), {0, TwoSegs, Qi9} = rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8), {ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9), ok = verify_read_with_published(false, true, ReadB, - lists:reverse(SeqIdsMsgIdsB)), + lists:reverse(SeqIdsGuidsB)), _Qi11 = rabbit_queue_index:terminate([], Qi10), ok = stop_msg_store(), ok = rabbit_queue_index:start_msg_stores([test_queue()]), @@ -1282,7 +1282,7 @@ test_queue_index() -> Qi14 = queue_index_deliver(SeqIdsB, Qi13), {ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14), ok = verify_read_with_published(true, true, ReadC, - lists:reverse(SeqIdsMsgIdsB)), + lists:reverse(SeqIdsGuidsB)), Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15), Qi17 = queue_index_flush_journal(Qi16), %% Everything will have gone now because #pubs == #acks @@ -1302,20 +1302,20 @@ test_queue_index() -> %% a) partial pub+del+ack, then move to new segment SeqIdsC = lists:seq(0,trunc(SegmentSize/2)), {0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false), - {Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22), + {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22), Qi24 = queue_index_deliver(SeqIdsC, Qi23), Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24), Qi26 = queue_index_flush_journal(Qi25), - {Qi27, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], false, Qi26), + {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26), _Qi28 = rabbit_queue_index:terminate_and_erase(Qi27), ok = stop_msg_store(), ok = empty_test_queue(), %% b) partial pub+del, then move to new segment, then ack all in old segment {0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false), - {Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29), + {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29), Qi31 = queue_index_deliver(SeqIdsC, Qi30), - {Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31), + {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31), Qi33 = rabbit_queue_index:write_acks(SeqIdsC, Qi32), Qi34 = queue_index_flush_journal(Qi33), _Qi35 = rabbit_queue_index:terminate_and_erase(Qi34), @@ -1325,7 +1325,7 @@ test_queue_index() -> %% c) just fill up several segments of all pubs, then +dels, then +acks SeqIdsD = lists:seq(0,SegmentSize*4), {0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false), - {Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36), + {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36), Qi38 = queue_index_deliver(SeqIdsD, Qi37), Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38), Qi40 = queue_index_flush_journal(Qi39), diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index f2e9c19c56..164533b716 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -319,7 +319,7 @@ publish(Msg, State) -> {_SeqId, State2} = publish(Msg, false, false, State1), State2. -publish_delivered(Msg = #basic_message { guid = MsgId, +publish_delivered(Msg = #basic_message { guid = Guid, is_persistent = IsPersistent }, State = #vqstate { len = 0, index_state = IndexState, next_seq_id = SeqId, @@ -331,7 +331,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, State1 = State #vqstate { out_counter = OutCount + 1, in_counter = InCount + 1 }, MsgStatus = #msg_status { - msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = true, msg_on_disk = false, index_on_disk = false }, {MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), @@ -344,7 +344,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId, true -> {#msg_status { index_on_disk = true }, IndexState1} = maybe_write_index_to_disk(false, MsgStatus1, IndexState), - {{ack_index_and_store, MsgId, SeqId, + {{ack_index_and_store, Guid, SeqId, find_msg_store(IsPersistent, PersistentStore)}, State2 #vqstate { index_state = IndexState1, next_seq_id = SeqId + 1 }}; @@ -411,7 +411,7 @@ fetch(State = {empty, _Q4} -> fetch_from_q3_or_delta(State); {{value, #msg_status { - msg = Msg, guid = MsgId, seq_id = SeqId, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }}, Q4a} -> @@ -436,11 +436,11 @@ fetch(State = AckTag = case IsPersistent of true -> true = MsgOnDisk, %% ASSERTION - {ack_index_and_store, MsgId, SeqId, MsgStore}; + {ack_index_and_store, Guid, SeqId, MsgStore}; false -> ok = case MsgOnDisk of true -> rabbit_msg_store:remove( - MsgStore, [MsgId]); + MsgStore, [Guid]); false -> ok end, ack_not_on_disk @@ -457,22 +457,22 @@ ack([], State) -> ack(AckTags, State = #vqstate { index_state = IndexState, persistent_count = PCount, persistent_store = PersistentStore }) -> - {MsgIdsByStore, SeqIds} = + {GuidsByStore, SeqIds} = lists:foldl( fun (ack_not_on_disk, Acc) -> Acc; - ({ack_index_and_store, MsgId, SeqId, MsgStore}, {Dict, SeqIds}) -> - {rabbit_misc:dict_cons(MsgStore, MsgId, Dict), [SeqId | SeqIds]} + ({ack_index_and_store, Guid, SeqId, MsgStore}, {Dict, SeqIds}) -> + {rabbit_misc:dict_cons(MsgStore, Guid, Dict), [SeqId | SeqIds]} end, {dict:new(), []}, AckTags), IndexState1 = case SeqIds of [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = dict:fold(fun (MsgStore, MsgIds, ok) -> - rabbit_msg_store:remove(MsgStore, MsgIds) - end, ok, MsgIdsByStore), - PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of error -> 0; - {ok, MsgIds} -> length(MsgIds) + {ok, Guids} -> length(Guids) end, State #vqstate { index_state = IndexState1, persistent_count = PCount1 }. @@ -533,20 +533,20 @@ delete_and_terminate(State) -> %% msg_store:release so that the cache isn't held full of msgs which %% are now at the tail of the queue. requeue(MsgsWithAckTags, State) -> - {SeqIds, MsgIdsByStore, + {SeqIds, GuidsByStore, State1 = #vqstate { index_state = IndexState, persistent_count = PCount, persistent_store = PersistentStore }} = lists:foldl( - fun ({Msg = #basic_message { guid = MsgId }, AckTag}, + fun ({Msg = #basic_message { guid = Guid }, AckTag}, {SeqIdsAcc, Dict, StateN}) -> {SeqIdsAcc1, Dict1, MsgOnDisk} = case AckTag of ack_not_on_disk -> {SeqIdsAcc, Dict, false}; - {ack_index_and_store, MsgId, SeqId, MsgStore} -> + {ack_index_and_store, Guid, SeqId, MsgStore} -> {[SeqId | SeqIdsAcc], - rabbit_misc:dict_cons(MsgStore, MsgId, Dict), + rabbit_misc:dict_cons(MsgStore, Guid, Dict), true} end, {_SeqId, StateN1} = @@ -558,21 +558,21 @@ requeue(MsgsWithAckTags, State) -> [] -> IndexState; _ -> rabbit_queue_index:write_acks(SeqIds, IndexState) end, - ok = dict:fold(fun (MsgStore, MsgIds, ok) -> - rabbit_msg_store:release(MsgStore, MsgIds) - end, ok, MsgIdsByStore), - PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:release(MsgStore, Guids) + end, ok, GuidsByStore), + PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of error -> 0; - {ok, MsgIds} -> length(MsgIds) + {ok, Guids} -> length(Guids) end, State1 #vqstate { index_state = IndexState1, persistent_count = PCount1 }. -tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId }, +tx_publish(Msg = #basic_message { is_persistent = true, guid = Guid }, State = #vqstate { msg_store_clients = MSCState, persistent_store = PersistentStore }) -> MsgStatus = #msg_status { - msg = Msg, guid = MsgId, seq_id = undefined, is_persistent = true, + msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true, is_delivered = false, msg_on_disk = false, index_on_disk = false }, {#msg_status { msg_on_disk = true }, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState), @@ -591,8 +591,8 @@ tx_commit(Pubs, AckTags, From, State = #vqstate { persistent_store = PersistentStore }) -> %% If we are a non-durable queue, or we have no persistent pubs, %% we can skip the msg_store loop. - PersistentMsgIds = persistent_guids(Pubs), - IsTransientPubs = [] == PersistentMsgIds, + PersistentGuids = persistent_guids(Pubs), + IsTransientPubs = [] == PersistentGuids, case IsTransientPubs orelse ?TRANSIENT_MSG_STORE == PersistentStore of true -> @@ -601,7 +601,7 @@ tx_commit(Pubs, AckTags, From, State = false -> Self = self(), ok = rabbit_msg_store:sync( - ?PERSISTENT_MSG_STORE, PersistentMsgIds, + ?PERSISTENT_MSG_STORE, PersistentGuids, fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue( Self, fun (StateN) -> tx_commit_post_msg_store( @@ -636,7 +636,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms}, persistent_store = PersistentStore }) -> Acks = lists:flatten(SAcks), State1 = ack(Acks, State), - AckSeqIds = lists:foldl(fun ({ack_index_and_store, _MsgId, + AckSeqIds = lists:foldl(fun ({ack_index_and_store, _Guid, SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) -> [SeqId | SeqIdsAcc]; (_, SeqIdsAcc) -> @@ -700,13 +700,13 @@ update_rate(Now, Then, Count, {OThen, OCount}) -> {Avg, {Then, Count}}. persistent_guids(Pubs) -> - [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs, + [Guid || Obj = #basic_message { guid = Guid } <- Pubs, Obj #basic_message.is_persistent]. betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> {Filtered, IndexState1} = lists:foldr( - fun ({MsgId, SeqId, IsPersistent, IsDelivered}, + fun ({Guid, SeqId, IsPersistent, IsDelivered}, {FilteredAcc, IndexStateAcc}) -> case SeqId < TransientThreshold andalso not IsPersistent of true -> @@ -722,7 +722,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) -> case SeqId < SeqIdLimit of true -> {[#msg_status { msg = undefined, - guid = MsgId, + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, @@ -838,12 +838,12 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState, end. remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> - {_PersistentStore, Count, MsgIdsByStore, SeqIds, IndexState1} = + {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} = Fold(fun remove_queue_entries1/2, {PersistentStore, 0, dict:new(), [], IndexState}, Q), - ok = dict:fold(fun (MsgStore, MsgIds, ok) -> - rabbit_msg_store:remove(MsgStore, MsgIds) - end, ok, MsgIdsByStore), + ok = dict:fold(fun (MsgStore, Guids, ok) -> + rabbit_msg_store:remove(MsgStore, Guids) + end, ok, GuidsByStore), IndexState2 = case SeqIds of [] -> IndexState1; @@ -852,18 +852,18 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) -> {Count, IndexState2}. remove_queue_entries1( - #msg_status { guid = MsgId, seq_id = SeqId, + #msg_status { guid = Guid, seq_id = SeqId, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk, is_persistent = IsPersistent }, - {PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) -> - MsgIdsByStore1 = + {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) -> + GuidsByStore1 = case {MsgOnDisk, IsPersistent} of {true, true} -> - rabbit_misc:dict_cons(PersistentStore, MsgId, MsgIdsByStore); + rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore); {true, false} -> - rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, MsgId, MsgIdsByStore); + rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore); {false, _} -> - MsgIdsByStore + GuidsByStore end, SeqIdsAcc1 = case IndexOnDisk of true -> [SeqId | SeqIdsAcc]; @@ -874,7 +874,7 @@ remove_queue_entries1( SeqId, IndexStateN); false -> IndexStateN end, - {PersistentStore, CountN + 1, MsgIdsByStore1, SeqIdsAcc1, IndexStateN1}. + {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}. fetch_from_q3_or_delta(State = #vqstate { q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount }, @@ -889,12 +889,12 @@ fetch_from_q3_or_delta(State = #vqstate { true = queue:is_empty(Q1), %% ASSERTION {empty, State}; {{value, IndexOnDisk, MsgStatus = #msg_status { - msg = undefined, guid = MsgId, + msg = undefined, guid = Guid, is_persistent = IsPersistent }}, Q3a} -> {{ok, Msg = #basic_message { is_persistent = IsPersistent, - guid = MsgId }}, MSCState1} = + guid = Guid }}, MSCState1} = read_from_msg_store( - PersistentStore, MSCState, IsPersistent, MsgId), + PersistentStore, MSCState, IsPersistent, Guid), Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4), RamIndexCount1 = case IndexOnDisk of true -> RamIndexCount; @@ -978,12 +978,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount, end end. -publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId }, +publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid }, IsDelivered, MsgOnDisk, State = #vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount, persistent_count = PCount }) -> MsgStatus = #msg_status { - msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent, + msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered, msg_on_disk = MsgOnDisk, index_on_disk = false }, PCount1 = PCount + case IsPersistent of @@ -1084,11 +1084,11 @@ with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false, {Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT), {Result, {MSCStateP, {MSCStateT1, TRef}}}. -read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) -> +read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) -> with_msg_store_state( PersistentStore, MSCState, IsPersistent, fun (MsgStore, MSCState1) -> - rabbit_msg_store:read(MsgStore, MsgId, MSCState1) + rabbit_msg_store:read(MsgStore, Guid, MSCState1) end). maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = @@ -1096,7 +1096,7 @@ maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus = {MsgStatus, MSCState}; maybe_write_msg_to_disk(PersistentStore, Force, MsgStatus = #msg_status { - msg = Msg, guid = MsgId, + msg = Msg, guid = Guid, is_persistent = IsPersistent }, MSCState) when Force orelse IsPersistent -> {ok, MSCState1} = @@ -1104,7 +1104,7 @@ maybe_write_msg_to_disk(PersistentStore, Force, PersistentStore, MSCState, IsPersistent, fun (MsgStore, MSCState2) -> rabbit_msg_store:write( - MsgStore, MsgId, ensure_binary_properties(Msg), MSCState2) + MsgStore, Guid, ensure_binary_properties(Msg), MSCState2) end), {MsgStatus #msg_status { msg_on_disk = true }, MSCState1}; maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) -> @@ -1115,13 +1115,13 @@ maybe_write_index_to_disk(_Force, MsgStatus = true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION {MsgStatus, IndexState}; maybe_write_index_to_disk(Force, MsgStatus = #msg_status { - guid = MsgId, seq_id = SeqId, + guid = Guid, seq_id = SeqId, is_persistent = IsPersistent, is_delivered = IsDelivered }, IndexState) when Force orelse IsPersistent -> true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION IndexState1 = rabbit_queue_index:write_published( - MsgId, SeqId, IsPersistent, IndexState), + Guid, SeqId, IsPersistent, IndexState), {MsgStatus #msg_status { index_on_disk = true }, case IsDelivered of true -> rabbit_queue_index:write_delivered(SeqId, IndexState1); |
