summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@lshift.net>2010-04-12 12:21:50 +0100
committerMatthew Sackman <matthew@lshift.net>2010-04-12 12:21:50 +0100
commit9f3b55b939b29c46188e7e90937bf8edc204793f (patch)
tree3af622c044ac012f8ffdb2d51dc9fcbc93de7006
parent703a4168d0f612544f657d0f51bdd8710f5dca1b (diff)
downloadrabbitmq-server-git-9f3b55b939b29c46188e7e90937bf8edc204793f.tar.gz
MsgId => Guid. All tests still pass. The distinction between msg_ids and guids is now complete
-rw-r--r--src/rabbit_msg_file.erl24
-rw-r--r--src/rabbit_msg_store.erl262
-rw-r--r--src/rabbit_queue_index.erl86
-rw-r--r--src/rabbit_tests.erl166
-rw-r--r--src/rabbit_variable_queue.erl108
5 files changed, 323 insertions, 323 deletions
diff --git a/src/rabbit_msg_file.erl b/src/rabbit_msg_file.erl
index 0edeb4698d..792f0efaba 100644
--- a/src/rabbit_msg_file.erl
+++ b/src/rabbit_msg_file.erl
@@ -64,14 +64,14 @@
%%----------------------------------------------------------------------------
-append(FileHdl, MsgId, MsgBody)
- when is_binary(MsgId) andalso size(MsgId) =< ?GUID_SIZE_BYTES ->
+append(FileHdl, Guid, MsgBody)
+ when is_binary(Guid) andalso size(Guid) =< ?GUID_SIZE_BYTES ->
MsgBodyBin = term_to_binary(MsgBody),
MsgBodyBinSize = size(MsgBodyBin),
Size = MsgBodyBinSize + ?GUID_SIZE_BYTES,
case file_handle_cache:append(FileHdl,
<<Size:?INTEGER_SIZE_BITS,
- MsgId:?GUID_SIZE_BYTES/binary,
+ Guid:?GUID_SIZE_BYTES/binary,
MsgBodyBin:MsgBodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>) of
ok -> {ok, Size + ?FILE_PACKING_ADJUSTMENT};
@@ -83,10 +83,10 @@ read(FileHdl, TotalSize) ->
BodyBinSize = Size - ?GUID_SIZE_BYTES,
case file_handle_cache:read(FileHdl, TotalSize) of
{ok, <<Size:?INTEGER_SIZE_BITS,
- MsgId:?GUID_SIZE_BYTES/binary,
+ Guid:?GUID_SIZE_BYTES/binary,
MsgBodyBin:BodyBinSize/binary,
?WRITE_OK_MARKER:?WRITE_OK_SIZE_BITS>>} ->
- {ok, {MsgId, binary_to_term(MsgBodyBin)}};
+ {ok, {Guid, binary_to_term(MsgBodyBin)}};
KO -> KO
end.
@@ -97,8 +97,8 @@ scan(FileHdl, Offset, Acc) ->
eof -> {ok, Acc, Offset};
{corrupted, NextOffset} ->
scan(FileHdl, NextOffset, Acc);
- {ok, {MsgId, TotalSize, NextOffset}} ->
- scan(FileHdl, NextOffset, [{MsgId, TotalSize, Offset} | Acc]);
+ {ok, {Guid, TotalSize, NextOffset}} ->
+ scan(FileHdl, NextOffset, [{Guid, TotalSize, Offset} | Acc]);
_KO ->
%% bad message, but we may still have recovered some valid messages
{ok, Acc, Offset}
@@ -108,9 +108,9 @@ read_next(FileHdl, Offset) ->
case file_handle_cache:read(FileHdl, ?SIZE_AND_GUID_BYTES) of
%% Here we take option 5 from
%% http://www.erlang.org/cgi-bin/ezmlm-cgi?2:mss:1569 in which
- %% we read the MsgId as a number, and then convert it back to
+ %% we read the Guid as a number, and then convert it back to
%% a binary in order to work around bugs in Erlang's GC.
- {ok, <<Size:?INTEGER_SIZE_BITS, MsgIdNum:?GUID_SIZE_BITS>>} ->
+ {ok, <<Size:?INTEGER_SIZE_BITS, GuidNum:?GUID_SIZE_BITS>>} ->
case Size of
0 -> eof; %% Nothing we can do other than stop
_ ->
@@ -123,9 +123,9 @@ read_next(FileHdl, Offset) ->
case file_handle_cache:read(FileHdl, 1) of
{ok,
<<?WRITE_OK_MARKER: ?WRITE_OK_SIZE_BITS>>} ->
- <<MsgId:?GUID_SIZE_BYTES/binary>> =
- <<MsgIdNum:?GUID_SIZE_BITS>>,
- {ok, {MsgId, TotalSize, NextOffset}};
+ <<Guid:?GUID_SIZE_BYTES/binary>> =
+ <<GuidNum:?GUID_SIZE_BITS>>,
+ {ok, {Guid, TotalSize, NextOffset}};
{ok, _SomeOtherData} ->
{corrupted, NextOffset};
KO -> KO
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 1a7085a2bd..b6d6c5daa2 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -152,7 +152,7 @@
%% The components:
%%
-%% MsgLocation: this is a mapping from MsgId to #msg_location{}:
+%% MsgLocation: this is a mapping from Guid to #msg_location{}:
%% {Guid, RefCount, File, Offset, TotalSize}
%% By default, it's in ets, but it's also pluggable.
%% FileSummary: this is an ets table which contains:
@@ -307,29 +307,29 @@ start_link(Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit) ->
[Server, Dir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit],
[{timeout, infinity}]).
-write(Server, MsgId, Msg, CState =
+write(Server, Guid, Msg, CState =
#client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
- ok = add_to_cache(CurFileCacheEts, MsgId, Msg),
- {gen_server2:cast(Server, {write, MsgId, Msg}), CState}.
+ ok = add_to_cache(CurFileCacheEts, Guid, Msg),
+ {gen_server2:cast(Server, {write, Guid, Msg}), CState}.
-read(Server, MsgId, CState =
+read(Server, Guid, CState =
#client_msstate { dedup_cache_ets = DedupCacheEts,
cur_file_cache_ets = CurFileCacheEts }) ->
%% 1. Check the dedup cache
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
+ case fetch_and_increment_cache(DedupCacheEts, Guid) of
not_found ->
%% 2. Check the cur file cache
- case ets:lookup(CurFileCacheEts, MsgId) of
+ case ets:lookup(CurFileCacheEts, Guid) of
[] ->
Defer = fun() -> {gen_server2:pcall(
- Server, 2, {read, MsgId}, infinity),
+ Server, 2, {read, Guid}, infinity),
CState} end,
- case index_lookup(MsgId, CState) of
+ case index_lookup(Guid, CState) of
not_found -> Defer();
MsgLocation -> client_read1(Server, MsgLocation, Defer,
CState)
end;
- [{MsgId, Msg, _CacheRefCount}] ->
+ [{Guid, Msg, _CacheRefCount}] ->
%% Although we've found it, we don't know the
%% refcount, so can't insert into dedup cache
{{ok, Msg}, CState}
@@ -338,10 +338,10 @@ read(Server, MsgId, CState =
{{ok, Msg}, CState}
end.
-contains(Server, MsgId) -> gen_server2:call(Server, {contains, MsgId}, infinity).
-remove(Server, MsgIds) -> gen_server2:cast(Server, {remove, MsgIds}).
-release(Server, MsgIds) -> gen_server2:cast(Server, {release, MsgIds}).
-sync(Server, MsgIds, K) -> gen_server2:cast(Server, {sync, MsgIds, K}).
+contains(Server, Guid) -> gen_server2:call(Server, {contains, Guid}, infinity).
+remove(Server, Guids) -> gen_server2:cast(Server, {remove, Guids}).
+release(Server, Guids) -> gen_server2:cast(Server, {release, Guids}).
+sync(Server, Guids, K) -> gen_server2:cast(Server, {sync, Guids, K}).
sync(Server) -> gen_server2:pcast(Server, 8, sync). %% internal
gc_done(Server, Reclaimed, Source, Destination) ->
@@ -381,37 +381,37 @@ clean(Server, BaseDir) ->
%% Client-side-only helpers
%%----------------------------------------------------------------------------
-add_to_cache(CurFileCacheEts, MsgId, Msg) ->
- case ets:insert_new(CurFileCacheEts, {MsgId, Msg, 1}) of
+add_to_cache(CurFileCacheEts, Guid, Msg) ->
+ case ets:insert_new(CurFileCacheEts, {Guid, Msg, 1}) of
true ->
ok;
false ->
try
- ets:update_counter(CurFileCacheEts, MsgId, {3, +1}),
+ ets:update_counter(CurFileCacheEts, Guid, {3, +1}),
ok
- catch error:badarg -> add_to_cache(CurFileCacheEts, MsgId, Msg)
+ catch error:badarg -> add_to_cache(CurFileCacheEts, Guid, Msg)
end
end.
-client_read1(Server, #msg_location { guid = MsgId, file = File } =
+client_read1(Server, #msg_location { guid = Guid, file = File } =
MsgLocation, Defer, CState =
#client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
[] -> %% File has been GC'd and no longer exists. Go around again.
- read(Server, MsgId, CState);
+ read(Server, Guid, CState);
[#file_summary { locked = Locked, right = Right }] ->
client_read2(Server, Locked, Right, MsgLocation, Defer, CState)
end.
client_read2(_Server, false, undefined,
- #msg_location { guid = MsgId, ref_count = RefCount }, Defer,
+ #msg_location { guid = Guid, ref_count = RefCount }, Defer,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
dedup_cache_ets = DedupCacheEts }) ->
- case ets:lookup(CurFileCacheEts, MsgId) of
+ case ets:lookup(CurFileCacheEts, Guid) of
[] ->
Defer(); %% may have rolled over
- [{MsgId, Msg, _CacheRefCount}] ->
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
+ [{Guid, Msg, _CacheRefCount}] ->
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{{ok, Msg}, CState}
end;
client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
@@ -420,7 +420,7 @@ client_read2(_Server, true, _Right, _MsgLocation, Defer, _CState) ->
%% the safest and simplest thing to do.
Defer();
client_read2(Server, false, _Right,
- #msg_location { guid = MsgId, ref_count = RefCount, file = File },
+ #msg_location { guid = Guid, ref_count = RefCount, file = File },
Defer, CState =
#client_msstate { file_handles_ets = FileHandlesEts,
file_summary_ets = FileSummaryEts,
@@ -430,7 +430,7 @@ client_read2(Server, false, _Right,
%% finished.
try ets:update_counter(FileSummaryEts, File, {#file_summary.readers, +1})
catch error:badarg -> %% the File has been GC'd and deleted. Go around.
- read(Server, MsgId, CState)
+ read(Server, Guid, CState)
end,
Release = fun() -> ets:update_counter(FileSummaryEts, File,
{#file_summary.readers, -1})
@@ -452,7 +452,7 @@ client_read2(Server, false, _Right,
%% readers, msg_store ets:deletes (and unlocks the dest)
try Release(),
Defer()
- catch error:badarg -> read(Server, MsgId, CState)
+ catch error:badarg -> read(Server, Guid, CState)
end;
false ->
%% Ok, we're definitely safe to continue - a GC can't
@@ -468,7 +468,7 @@ client_read2(Server, false, _Right,
%% badarg scenario above, but we don't have a missing file
%% - we just have the /wrong/ file).
- case index_lookup(MsgId, CState) of
+ case index_lookup(Guid, CState) of
MsgLocation = #msg_location { file = File } ->
%% Still the same file.
%% This is fine to fail (already exists)
@@ -476,7 +476,7 @@ client_read2(Server, false, _Right,
CState1 = close_all_indicated(CState),
{Msg, CState2} =
read_from_disk(MsgLocation, CState1, DedupCacheEts),
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId,
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid,
Msg),
Release(), %% this MUST NOT fail with badarg
{{ok, Msg}, CState2};
@@ -589,12 +589,12 @@ init([Server, BaseDir, ClientRefs, MsgRefDeltaGen, MsgRefDeltaGenInit]) ->
gc_pid = GCPid }, hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.
-handle_call({read, MsgId}, From, State) ->
- State1 = read_message(MsgId, From, State),
+handle_call({read, Guid}, From, State) ->
+ State1 = read_message(Guid, From, State),
noreply(State1);
-handle_call({contains, MsgId}, From, State) ->
- State1 = contains_message(MsgId, From, State),
+handle_call({contains, Guid}, From, State) ->
+ State1 = contains_message(Guid, From, State),
noreply(State1);
handle_call({new_client_state, CRef}, _From,
@@ -617,21 +617,21 @@ handle_call({delete_client, CRef}, _From,
reply(ok,
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }).
-handle_cast({write, MsgId, Msg},
+handle_cast({write, Guid, Msg},
State = #msstate { current_file_handle = CurHdl,
current_file = CurFile,
sum_valid_data = SumValid,
sum_file_size = SumFileSize,
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts }) ->
- true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- case index_lookup(MsgId, State) of
+ true = 0 =< ets:update_counter(CurFileCacheEts, Guid, {3, -1}),
+ case index_lookup(Guid, State) of
not_found ->
%% New message, lots to do
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
- {ok, TotalSize} = rabbit_msg_file:append(CurHdl, MsgId, Msg),
+ {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg),
ok = index_insert(#msg_location {
- guid = MsgId, ref_count = 1, file = CurFile,
+ guid = Guid, ref_count = 1, file = CurFile,
offset = CurOffset, total_size = TotalSize },
State),
[#file_summary { valid_total_size = ValidTotalSize,
@@ -661,34 +661,34 @@ handle_cast({write, MsgId, Msg},
#msg_location { ref_count = RefCount } ->
%% We already know about it, just update counter. Only
%% update field otherwise bad interaction with concurrent GC
- ok = index_update_fields(MsgId,
+ ok = index_update_fields(Guid,
{#msg_location.ref_count, RefCount + 1},
State),
noreply(State)
end;
-handle_cast({remove, MsgIds}, State) ->
+handle_cast({remove, Guids}, State) ->
State1 = lists:foldl(
- fun (MsgId, State2) -> remove_message(MsgId, State2) end,
- State, MsgIds),
+ fun (Guid, State2) -> remove_message(Guid, State2) end,
+ State, Guids),
noreply(maybe_compact(State1));
-handle_cast({release, MsgIds}, State =
+handle_cast({release, Guids}, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
lists:foreach(
- fun (MsgId) -> decrement_cache(DedupCacheEts, MsgId) end, MsgIds),
+ fun (Guid) -> decrement_cache(DedupCacheEts, Guid) end, Guids),
noreply(State);
-handle_cast({sync, MsgIds, K},
+handle_cast({sync, Guids, K},
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
on_sync = Syncs }) ->
{ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl),
- case lists:any(fun (MsgId) ->
+ case lists:any(fun (Guid) ->
#msg_location { file = File, offset = Offset } =
- index_lookup(MsgId, State),
+ index_lookup(Guid, State),
File =:= CurFile andalso Offset >= SyncOffset
- end, MsgIds) of
+ end, Guids) of
false -> K(),
noreply(State);
true -> noreply(State #msstate { on_sync = [K | Syncs] })
@@ -821,13 +821,13 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
State1 #msstate { on_sync = [] }
end.
-read_message(MsgId, From, State =
+read_message(Guid, From, State =
#msstate { dedup_cache_ets = DedupCacheEts }) ->
- case index_lookup(MsgId, State) of
+ case index_lookup(Guid, State) of
not_found -> gen_server2:reply(From, not_found),
State;
MsgLocation ->
- case fetch_and_increment_cache(DedupCacheEts, MsgId) of
+ case fetch_and_increment_cache(DedupCacheEts, Guid) of
not_found ->
read_message1(From, MsgLocation, State);
Msg ->
@@ -836,7 +836,7 @@ read_message(MsgId, From, State =
end
end.
-read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
+read_message1(From, #msg_location { guid = Guid, ref_count = RefCount,
file = File, offset = Offset } = MsgLoc,
State = #msstate { current_file = CurFile,
current_file_handle = CurHdl,
@@ -847,7 +847,7 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
true ->
{Msg, State1} =
%% can return [] if msg in file existed on startup
- case ets:lookup(CurFileCacheEts, MsgId) of
+ case ets:lookup(CurFileCacheEts, Guid) of
[] ->
ok = case {ok, Offset} >=
file_handle_cache:current_raw_offset(CurHdl) of
@@ -855,10 +855,10 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
false -> ok
end,
read_from_disk(MsgLoc, State, DedupCacheEts);
- [{MsgId, Msg1, _CacheRefCount}] ->
+ [{Guid, Msg1, _CacheRefCount}] ->
{Msg1, State}
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
gen_server2:reply(From, {ok, Msg}),
State1;
false ->
@@ -866,7 +866,7 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
ets:lookup(FileSummaryEts, File),
case Locked of
true ->
- add_to_pending_gc_completion({read, MsgId, From}, State);
+ add_to_pending_gc_completion({read, Guid, From}, State);
false ->
{Msg, State1} = read_from_disk(MsgLoc, State, DedupCacheEts),
gen_server2:reply(From, {ok, Msg}),
@@ -874,36 +874,36 @@ read_message1(From, #msg_location { guid = MsgId, ref_count = RefCount,
end
end.
-read_from_disk(#msg_location { guid = MsgId, ref_count = RefCount,
+read_from_disk(#msg_location { guid = Guid, ref_count = RefCount,
file = File, offset = Offset,
total_size = TotalSize }, State,
DedupCacheEts) ->
{Hdl, State1} = get_read_handle(File, State),
{ok, Offset} = file_handle_cache:position(Hdl, Offset),
- {ok, {MsgId, Msg}} =
+ {ok, {Guid, Msg}} =
case rabbit_msg_file:read(Hdl, TotalSize) of
- {ok, {MsgId, _}} = Obj ->
+ {ok, {Guid, _}} = Obj ->
Obj;
Rest ->
throw({error, {misread, [{old_state, State},
{file_num, File},
{offset, Offset},
- {guid, MsgId},
+ {guid, Guid},
{read, Rest},
{proc_dict, get()}
]}})
end,
- ok = maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg),
+ ok = maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg),
{Msg, State1}.
-maybe_insert_into_cache(DedupCacheEts, RefCount, MsgId, Msg)
+maybe_insert_into_cache(DedupCacheEts, RefCount, Guid, Msg)
when RefCount > 1 ->
- insert_into_cache(DedupCacheEts, MsgId, Msg);
-maybe_insert_into_cache(_DedupCacheEts, _RefCount, _MsgId, _Msg) ->
+ insert_into_cache(DedupCacheEts, Guid, Msg);
+maybe_insert_into_cache(_DedupCacheEts, _RefCount, _Guid, _Msg) ->
ok.
-contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
- case index_lookup(MsgId, State) of
+contains_message(Guid, From, State = #msstate { gc_active = GCActive }) ->
+ case index_lookup(Guid, State) of
not_found ->
gen_server2:reply(From, false),
State;
@@ -911,34 +911,34 @@ contains_message(MsgId, From, State = #msstate { gc_active = GCActive }) ->
case GCActive of
{A, B} when File == A orelse File == B ->
add_to_pending_gc_completion(
- {contains, MsgId, From}, State);
+ {contains, Guid, From}, State);
_ ->
gen_server2:reply(From, true),
State
end
end.
-remove_message(MsgId, State = #msstate { sum_valid_data = SumValid,
+remove_message(Guid, State = #msstate { sum_valid_data = SumValid,
file_summary_ets = FileSummaryEts,
dedup_cache_ets = DedupCacheEts }) ->
#msg_location { ref_count = RefCount, file = File,
offset = Offset, total_size = TotalSize } =
- index_lookup(MsgId, State),
+ index_lookup(Guid, State),
case RefCount of
1 ->
%% don't remove from CUR_FILE_CACHE_ETS_NAME here because
%% there may be further writes in the mailbox for the same
%% msg.
- ok = remove_cache_entry(DedupCacheEts, MsgId),
+ ok = remove_cache_entry(DedupCacheEts, Guid),
[#file_summary { valid_total_size = ValidTotalSize,
contiguous_top = ContiguousTop,
locked = Locked }] =
ets:lookup(FileSummaryEts, File),
case Locked of
true ->
- add_to_pending_gc_completion({remove, MsgId}, State);
+ add_to_pending_gc_completion({remove, Guid}, State);
false ->
- ok = index_delete(MsgId, State),
+ ok = index_delete(Guid, State),
ContiguousTop1 = lists:min([ContiguousTop, Offset]),
ValidTotalSize1 = ValidTotalSize - TotalSize,
true = ets:update_element(
@@ -949,9 +949,9 @@ remove_message(MsgId, State = #msstate { sum_valid_data = SumValid,
State1 #msstate { sum_valid_data = SumValid - TotalSize }
end;
_ when 1 < RefCount ->
- ok = decrement_cache(DedupCacheEts, MsgId),
+ ok = decrement_cache(DedupCacheEts, Guid),
%% only update field, otherwise bad interaction with concurrent GC
- ok = index_update_fields(MsgId,
+ ok = index_update_fields(Guid,
{#msg_location.ref_count, RefCount - 1},
State),
State
@@ -967,12 +967,12 @@ run_pending(State = #msstate { pending_gc_completion = Pending }) ->
State1 = State #msstate { pending_gc_completion = [] },
lists:foldl(fun run_pending/2, State1, lists:reverse(Pending)).
-run_pending({read, MsgId, From}, State) ->
- read_message(MsgId, From, State);
-run_pending({contains, MsgId, From}, State) ->
- contains_message(MsgId, From, State);
-run_pending({remove, MsgId}, State) ->
- remove_message(MsgId, State).
+run_pending({read, Guid, From}, State) ->
+ read_message(Guid, From, State);
+run_pending({contains, Guid, From}, State) ->
+ contains_message(Guid, From, State);
+run_pending({remove, Guid}, State) ->
+ remove_message(Guid, State).
open_file(Dir, FileName, Mode) ->
file_handle_cache:open(form_filename(Dir, FileName), ?BINARY_MODE ++ Mode,
@@ -1091,45 +1091,45 @@ scan_file_for_valid_messages(Dir, FileName) ->
%% message cache helper functions
%%----------------------------------------------------------------------------
-remove_cache_entry(DedupCacheEts, MsgId) ->
- true = ets:delete(DedupCacheEts, MsgId),
+remove_cache_entry(DedupCacheEts, Guid) ->
+ true = ets:delete(DedupCacheEts, Guid),
ok.
-fetch_and_increment_cache(DedupCacheEts, MsgId) ->
- case ets:lookup(DedupCacheEts, MsgId) of
+fetch_and_increment_cache(DedupCacheEts, Guid) ->
+ case ets:lookup(DedupCacheEts, Guid) of
[] ->
not_found;
- [{_MsgId, Msg, _RefCount}] ->
+ [{_Guid, Msg, _RefCount}] ->
try
- ets:update_counter(DedupCacheEts, MsgId, {3, 1})
+ ets:update_counter(DedupCacheEts, Guid, {3, 1})
catch error:badarg ->
%% someone has deleted us in the meantime, insert us
- ok = insert_into_cache(DedupCacheEts, MsgId, Msg)
+ ok = insert_into_cache(DedupCacheEts, Guid, Msg)
end,
Msg
end.
-decrement_cache(DedupCacheEts, MsgId) ->
- true = try case ets:update_counter(DedupCacheEts, MsgId, {3, -1}) of
- N when N =< 0 -> true = ets:delete(DedupCacheEts, MsgId);
+decrement_cache(DedupCacheEts, Guid) ->
+ true = try case ets:update_counter(DedupCacheEts, Guid, {3, -1}) of
+ N when N =< 0 -> true = ets:delete(DedupCacheEts, Guid);
_N -> true
end
catch error:badarg ->
- %% MsgId is not in there because although it's been
+ %% Guid is not in there because although it's been
%% delivered, it's never actually been read (think:
%% persistent message held in RAM)
true
end,
ok.
-insert_into_cache(DedupCacheEts, MsgId, Msg) ->
- case ets:insert_new(DedupCacheEts, {MsgId, Msg, 1}) of
+insert_into_cache(DedupCacheEts, Guid, Msg) ->
+ case ets:insert_new(DedupCacheEts, {Guid, Msg, 1}) of
true -> ok;
false -> try
- ets:update_counter(DedupCacheEts, MsgId, {3, 1}),
+ ets:update_counter(DedupCacheEts, Guid, {3, 1}),
ok
catch error:badarg ->
- insert_into_cache(DedupCacheEts, MsgId, Msg)
+ insert_into_cache(DedupCacheEts, Guid, Msg)
end
end.
@@ -1172,17 +1172,17 @@ count_msg_refs(true, _Gen, _Seed, _State) ->
count_msg_refs(Gen, Seed, State) ->
case Gen(Seed) of
finished -> ok;
- {_MsgId, 0, Next} -> count_msg_refs(Gen, Next, State);
- {MsgId, Delta, Next} ->
- ok = case index_lookup(MsgId, State) of
+ {_Guid, 0, Next} -> count_msg_refs(Gen, Next, State);
+ {Guid, Delta, Next} ->
+ ok = case index_lookup(Guid, State) of
not_found ->
- index_insert(#msg_location { guid = MsgId,
+ index_insert(#msg_location { guid = Guid,
ref_count = Delta },
State);
StoreEntry = #msg_location { ref_count = RefCount } ->
NewRefCount = RefCount + Delta,
case NewRefCount of
- 0 -> index_delete(MsgId, State);
+ 0 -> index_delete(Guid, State);
_ -> index_update(StoreEntry #msg_location {
ref_count = NewRefCount },
State)
@@ -1201,9 +1201,9 @@ recover_crashed_compactions(Dir, FileNames, TmpFileNames) ->
recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
NonTmpRelatedFileName = filename:rootname(TmpFileName) ++ ?FILE_EXTENSION,
true = lists:member(NonTmpRelatedFileName, FileNames),
- {ok, UncorruptedMessagesTmp, MsgIdsTmp} =
+ {ok, UncorruptedMessagesTmp, GuidsTmp} =
scan_file_for_valid_messages_guids(Dir, TmpFileName),
- {ok, UncorruptedMessages, MsgIds} =
+ {ok, UncorruptedMessages, Guids} =
scan_file_for_valid_messages_guids(Dir, NonTmpRelatedFileName),
%% 1) It's possible that everything in the tmp file is also in the
%% main file such that the main file is (prefix ++
@@ -1232,7 +1232,7 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% back to before any of the files in the tmp file and copy
%% them over again
TmpPath = form_filename(Dir, TmpFileName),
- case is_sublist(MsgIdsTmp, MsgIds) of
+ case is_sublist(GuidsTmp, Guids) of
true -> %% we're in case 1, 2 or 3 above. Just delete the tmp file
%% note this also catches the case when the tmp file
%% is empty
@@ -1243,13 +1243,13 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
%% there are no msgs in the tmp file then we would be in
%% the 'true' branch of this case, so we know the
%% lists:last call is safe.
- EldestTmpMsgId = lists:last(MsgIdsTmp),
- {MsgIds1, UncorruptedMessages1}
+ EldestTmpGuid = lists:last(GuidsTmp),
+ {Guids1, UncorruptedMessages1}
= case lists:splitwith(
- fun (MsgId) -> MsgId /= EldestTmpMsgId end, MsgIds) of
- {_MsgIds, []} -> %% no msgs from tmp in main
- {MsgIds, UncorruptedMessages};
- {Dropped, [EldestTmpMsgId | Rest]} ->
+ fun (Guid) -> Guid /= EldestTmpGuid end, Guids) of
+ {_Guids, []} -> %% no msgs from tmp in main
+ {Guids, UncorruptedMessages};
+ {Dropped, [EldestTmpGuid | Rest]} ->
%% Msgs in Dropped are in tmp, so forget them.
%% *cry*. Lists indexed from 1.
{Rest, lists:sublist(UncorruptedMessages,
@@ -1257,11 +1257,11 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
length(Rest))}
end,
%% The main file prefix should be contiguous
- {Top, MsgIds1} = find_contiguous_block_prefix(
+ {Top, Guids1} = find_contiguous_block_prefix(
lists:reverse(UncorruptedMessages1)),
%% we should have that none of the messages in the prefix
%% are in the tmp file
- true = is_disjoint(MsgIds1, MsgIdsTmp),
+ true = is_disjoint(Guids1, GuidsTmp),
%% must open with read flag, otherwise will stomp over contents
{ok, MainHdl} = open_file(
Dir, NonTmpRelatedFileName, [read | ?WRITE_MODE]),
@@ -1281,13 +1281,13 @@ recover_crashed_compactions1(Dir, FileNames, TmpFileName) ->
ok = file_handle_cache:close(MainHdl),
ok = file_handle_cache:delete(TmpHdl),
- {ok, _MainMessages, MsgIdsMain} =
+ {ok, _MainMessages, GuidsMain} =
scan_file_for_valid_messages_guids(
Dir, NonTmpRelatedFileName),
- %% check that everything in MsgIds1 is in MsgIdsMain
- true = is_sublist(MsgIds1, MsgIdsMain),
- %% check that everything in MsgIdsTmp is in MsgIdsMain
- true = is_sublist(MsgIdsTmp, MsgIdsMain)
+ %% check that everything in Guids1 is in GuidsMain
+ true = is_sublist(Guids1, GuidsMain),
+ %% check that everything in GuidsTmp is in GuidsMain
+ true = is_sublist(GuidsTmp, GuidsMain)
end,
ok.
@@ -1300,7 +1300,7 @@ is_disjoint(SmallerL, BiggerL) ->
scan_file_for_valid_messages_guids(Dir, FileName) ->
{ok, Messages, _FileSize} =
scan_file_for_valid_messages(Dir, FileName),
- {ok, Messages, [MsgId || {MsgId, _TotalSize, _FileOffset} <- Messages]}.
+ {ok, Messages, [Guid || {Guid, _TotalSize, _FileOffset} <- Messages]}.
%% Takes the list in *ascending* order (i.e. eldest message
%% first). This is the opposite of what scan_file_for_valid_messages
@@ -1309,14 +1309,14 @@ find_contiguous_block_prefix([]) -> {0, []};
find_contiguous_block_prefix(List) ->
find_contiguous_block_prefix(List, 0, []).
-find_contiguous_block_prefix([], ExpectedOffset, MsgIds) ->
- {ExpectedOffset, MsgIds};
-find_contiguous_block_prefix([{MsgId, TotalSize, ExpectedOffset} | Tail],
- ExpectedOffset, MsgIds) ->
+find_contiguous_block_prefix([], ExpectedOffset, Guids) ->
+ {ExpectedOffset, Guids};
+find_contiguous_block_prefix([{Guid, TotalSize, ExpectedOffset} | Tail],
+ ExpectedOffset, Guids) ->
ExpectedOffset1 = ExpectedOffset + TotalSize,
- find_contiguous_block_prefix(Tail, ExpectedOffset1, [MsgId | MsgIds]);
-find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, MsgIds) ->
- {ExpectedOffset, MsgIds}.
+ find_contiguous_block_prefix(Tail, ExpectedOffset1, [Guid | Guids]);
+find_contiguous_block_prefix([_MsgAfterGap | _Tail], ExpectedOffset, Guids) ->
+ {ExpectedOffset, Guids}.
build_index(true, _Files, State =
#msstate { file_summary_ets = FileSummaryEts }) ->
@@ -1373,8 +1373,8 @@ build_index_worker(
Dir, filenum_to_name(File)),
{ValidMessages, ValidTotalSize} =
lists:foldl(
- fun (Obj = {MsgId, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
- case index_lookup(MsgId, State) of
+ fun (Obj = {Guid, TotalSize, Offset}, {VMAcc, VTSAcc}) ->
+ case index_lookup(Guid, State) of
not_found -> {VMAcc, VTSAcc};
StoreEntry ->
ok = index_update(StoreEntry #msg_location {
@@ -1394,7 +1394,7 @@ build_index_worker(
%% file size.
[] -> {undefined, case ValidMessages of
[] -> 0;
- _ -> {_MsgId, TotalSize, Offset} =
+ _ -> {_Guid, TotalSize, Offset} =
lists:last(ValidMessages),
Offset + TotalSize
end};
@@ -1649,8 +1649,8 @@ find_unremoved_messages_in_file(File,
Dir, filenum_to_name(File)),
%% foldl will reverse so will end up with msgs in ascending offset order
lists:foldl(
- fun ({MsgId, _TotalSize, _Offset}, Acc) ->
- case Index:lookup(MsgId, IndexState) of
+ fun ({Guid, _TotalSize, _Offset}, Acc) ->
+ case Index:lookup(Guid, IndexState) of
Entry = #msg_location { file = File } -> [ Entry | Acc ];
_ -> Acc
end
@@ -1660,13 +1660,13 @@ copy_messages(WorkList, InitOffset, FinalOffset, SourceHdl, DestinationHdl,
Destination, {_FileSummaryEts, _Dir, Index, IndexState}) ->
{FinalOffset, BlockStart1, BlockEnd1} =
lists:foldl(
- fun (#msg_location { guid = MsgId, offset = Offset,
+ fun (#msg_location { guid = Guid, offset = Offset,
total_size = TotalSize },
{CurOffset, BlockStart, BlockEnd}) ->
%% CurOffset is in the DestinationFile.
%% Offset, BlockStart and BlockEnd are in the SourceFile
%% update MsgLocation to reflect change of file and offset
- ok = Index:update_fields(MsgId,
+ ok = Index:update_fields(Guid,
[{#msg_location.file, Destination},
{#msg_location.offset, CurOffset}],
IndexState),
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 556c6968e4..2d9b66738f 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -103,7 +103,7 @@
%% and seeding the message store on start up.
%% Note that in general, the representation of a message's state as
-%% the tuple: {('no_pub'|{MsgId, IsPersistent}), ('del'|'no_del'),
+%% the tuple: {('no_pub'|{Guid, IsPersistent}), ('del'|'no_del'),
%% ('ack'|'no_ack')} is richer than strictly necessary for most
%% operations. However, for startup, and to ensure the safe and
%% correct combination of journal entries with entries read from the
@@ -265,12 +265,12 @@ init(Name, MsgStoreRecovered) ->
Segment2 =
#segment { pubs = PubCount1, acks = AckCount1 } =
array:sparse_foldl(
- fun (RelSeq, {{MsgId, _IsPersistent}, Del, no_ack},
+ fun (RelSeq, {{Guid, _IsPersistent}, Del, no_ack},
Segment3) ->
Segment4 =
maybe_add_to_journal(
rabbit_msg_store:contains(
- ?PERSISTENT_MSG_STORE, MsgId),
+ ?PERSISTENT_MSG_STORE, Guid),
CleanShutdown, Del, RelSeq, Segment3),
Segment4
end, Segment1 #segment { pubs = PubCount,
@@ -327,15 +327,15 @@ terminate_and_erase(State) ->
ok = delete_queue_directory(State1 #qistate.dir),
State1.
-write_published(MsgId, SeqId, IsPersistent, State) when is_binary(MsgId) ->
- ?GUID_BYTES = size(MsgId),
+write_published(Guid, SeqId, IsPersistent, State) when is_binary(Guid) ->
+ ?GUID_BYTES = size(Guid),
{JournalHdl, State1} = get_journal_handle(State),
ok = file_handle_cache:append(
JournalHdl, [<<(case IsPersistent of
true -> ?PUB_PERSIST_JPREFIX;
false -> ?PUB_TRANS_JPREFIX
- end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, MsgId]),
- maybe_flush_journal(add_to_journal(SeqId, {MsgId, IsPersistent}, State1)).
+ end):?JPREFIX_BITS, SeqId:?SEQ_BITS>>, Guid]),
+ maybe_flush_journal(add_to_journal(SeqId, {Guid, IsPersistent}, State1)).
write_delivered(SeqId, State) ->
{JournalHdl, State1} = get_journal_handle(State),
@@ -396,8 +396,8 @@ read_segment_entries(InitSeqId, State = #qistate { segments = Segments,
{SegEntries, _PubCount, _AckCount, Segment1} = load_segment(false, Segment),
#segment { journal_entries = JEntries } = Segment1,
{array:sparse_foldr(
- fun (RelSeq, {{MsgId, IsPersistent}, IsDelivered, no_ack}, Acc) ->
- [ {MsgId, reconstruct_seq_id(Seg, RelSeq),
+ fun (RelSeq, {{Guid, IsPersistent}, IsDelivered, no_ack}, Acc) ->
+ [ {Guid, reconstruct_seq_id(Seg, RelSeq),
IsPersistent, IsDelivered == del} | Acc ]
end, [], journal_plus_segment(JEntries, SegEntries)),
State #qistate { segments = segment_store(Segment1, Segments) }}.
@@ -492,7 +492,7 @@ queue_index_walker({[], Gatherer}) ->
case gatherer:fetch(Gatherer) of
finished -> rabbit_misc:unlink_and_capture_exit(Gatherer),
finished;
- {value, {MsgId, Count}} -> {MsgId, Count, {[], Gatherer}}
+ {value, {Guid, Count}} -> {Guid, Count, {[], Gatherer}}
end;
queue_index_walker({[QueueName | QueueNames], Gatherer}) ->
Child = make_ref(),
@@ -519,9 +519,9 @@ queue_index_walker_reader(Gatherer, Ref, State, [Seg | SegNums]) ->
queue_index_walker_reader1(_Gatherer, State, []) ->
State;
queue_index_walker_reader1(
- Gatherer, State, [{MsgId, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
+ Gatherer, State, [{Guid, _SeqId, IsPersistent, _IsDelivered} | Msgs]) ->
case IsPersistent of
- true -> gatherer:produce(Gatherer, {MsgId, 1});
+ true -> gatherer:produce(Gatherer, {Guid, 1});
false -> ok
end,
queue_index_walker_reader1(Gatherer, State, Msgs).
@@ -684,17 +684,17 @@ get_journal_handle(State = #qistate { journal_handle = Hdl }) ->
bool_to_int(true ) -> 1;
bool_to_int(false) -> 0.
-write_entry_to_segment(_RelSeq, {{_MsgId, _IsPersistent}, del, ack}, Hdl) ->
+write_entry_to_segment(_RelSeq, {{_Guid, _IsPersistent}, del, ack}, Hdl) ->
Hdl;
write_entry_to_segment(RelSeq, {Pub, Del, Ack}, Hdl) ->
ok = case Pub of
no_pub ->
ok;
- {MsgId, IsPersistent} ->
+ {Guid, IsPersistent} ->
file_handle_cache:append(
Hdl, [<<?PUBLISH_PREFIX:?PUBLISH_PREFIX_BITS,
(bool_to_int(IsPersistent)):1,
- RelSeq:?REL_SEQ_BITS>>, MsgId])
+ RelSeq:?REL_SEQ_BITS>>, Guid])
end,
ok = case {Del, Ack} of
{no_del, no_ack} ->
@@ -775,10 +775,10 @@ load_segment_entries(KeepAcks, Hdl, SegEntries, PubCount, AckCount) ->
IsPersistentNum:1, RelSeq:?REL_SEQ_BITS>>} ->
%% because we specify /binary, and binaries are complete
%% bytes, the size spec is in bytes, not bits.
- {ok, MsgId} = file_handle_cache:read(Hdl, ?GUID_BYTES),
+ {ok, Guid} = file_handle_cache:read(Hdl, ?GUID_BYTES),
SegEntries1 =
array:set(RelSeq,
- {{MsgId, 1 == IsPersistentNum}, no_del, no_ack},
+ {{Guid, 1 == IsPersistentNum}, no_del, no_ack},
SegEntries),
load_segment_entries(KeepAcks, Hdl, SegEntries1, PubCount + 1,
AckCount);
@@ -837,13 +837,13 @@ load_journal_entries(State = #qistate { journal_handle = Hdl }) ->
load_journal_entries(add_to_journal(SeqId, ack, State));
_ ->
case file_handle_cache:read(Hdl, ?GUID_BYTES) of
- {ok, <<MsgIdNum:?GUID_BITS>>} ->
+ {ok, <<GuidNum:?GUID_BITS>>} ->
%% work around for binary data
%% fragmentation. See
%% rabbit_msg_file:read_next/2
- <<MsgId:?GUID_BYTES/binary>> =
- <<MsgIdNum:?GUID_BITS>>,
- Publish = {MsgId, case Prefix of
+ <<Guid:?GUID_BYTES/binary>> =
+ <<GuidNum:?GUID_BITS>>,
+ Publish = {Guid, case Prefix of
?PUB_PERSIST_JPREFIX -> true;
?PUB_TRANS_JPREFIX -> false
end},
@@ -873,7 +873,7 @@ add_to_journal(RelSeq, Action,
case Action of
del -> Segment1;
ack -> Segment1 #segment { acks = AckCount + 1 };
- {_MsgId, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
+ {_Guid, _IsPersistent} -> Segment1 #segment { pubs = PubCount + 1 }
end;
%% This is a more relaxed version of deliver_or_ack_msg because we can
@@ -912,30 +912,30 @@ journal_plus_segment(JEntries, SegEntries) ->
%% Here, the Out is the Seg Array which we may be adding to (for
%% items only in the journal), modifying (bits in both), or erasing
%% from (ack in journal, not segment).
-journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
+journal_plus_segment(Obj = {{_Guid, _IsPersistent}, no_del, no_ack},
not_found,
RelSeq, Out) ->
array:set(RelSeq, Obj, Out);
-journal_plus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
+journal_plus_segment(Obj = {{_Guid, _IsPersistent}, del, no_ack},
not_found,
RelSeq, Out) ->
array:set(RelSeq, Obj, Out);
-journal_plus_segment({{_MsgId, _IsPersistent}, del, ack},
+journal_plus_segment({{_Guid, _IsPersistent}, del, ack},
not_found,
RelSeq, Out) ->
array:reset(RelSeq, Out);
journal_plus_segment({no_pub, del, no_ack},
- {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
+ {Pub = {_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out) ->
array:set(RelSeq, {Pub, del, no_ack}, Out);
journal_plus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
+ {{_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out) ->
array:reset(RelSeq, Out);
journal_plus_segment({no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
+ {{_Guid, _IsPersistent}, del, no_ack},
RelSeq, Out) ->
array:reset(RelSeq, Out).
@@ -958,77 +958,77 @@ journal_minus_segment(JEntries, SegEntries) ->
%% publish or ack is in both the journal and the segment.
%% Both the same. Must be at least the publish
-journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, no_ack},
+journal_minus_segment(Obj, Obj = {{_Guid, _IsPersistent}, _Del, no_ack},
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved + 1, AcksRemoved};
-journal_minus_segment(Obj, Obj = {{_MsgId, _IsPersistent}, _Del, ack},
+journal_minus_segment(Obj, Obj = {{_Guid, _IsPersistent}, _Del, ack},
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved + 1, AcksRemoved + 1};
%% Just publish in journal
-journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, no_del, no_ack},
+journal_minus_segment(Obj = {{_Guid, _IsPersistent}, no_del, no_ack},
not_found,
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
%% Just deliver in journal
journal_minus_segment(Obj = {no_pub, del, no_ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
+ {{_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, no_ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
+ {{_Guid, _IsPersistent}, del, no_ack},
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved, AcksRemoved};
%% Just ack in journal
journal_minus_segment(Obj = {no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
+ {{_Guid, _IsPersistent}, del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, no_del, ack},
- {{_MsgId, _IsPersistent}, del, ack},
+ {{_Guid, _IsPersistent}, del, ack},
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved, AcksRemoved};
%% Publish and deliver in journal
-journal_minus_segment(Obj = {{_MsgId, _IsPersistent}, del, no_ack},
+journal_minus_segment(Obj = {{_Guid, _IsPersistent}, del, no_ack},
not_found,
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({Pub, del, no_ack},
- {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
+ {Pub = {_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, del, no_ack}, Out),
PubsRemoved + 1, AcksRemoved};
%% Deliver and ack in journal
journal_minus_segment(Obj = {no_pub, del, ack},
- {{_MsgId, _IsPersistent}, no_del, no_ack},
+ {{_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, Obj, Out), PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, del, no_ack},
+ {{_Guid, _IsPersistent}, del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, no_del, ack}, Out),
PubsRemoved, AcksRemoved};
journal_minus_segment({no_pub, del, ack},
- {{_MsgId, _IsPersistent}, del, ack},
+ {{_Guid, _IsPersistent}, del, ack},
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved, AcksRemoved + 1};
%% Publish, deliver and ack in journal
-journal_minus_segment({{_MsgId, _IsPersistent}, del, ack},
+journal_minus_segment({{_Guid, _IsPersistent}, del, ack},
not_found,
_RelSeq, Out, PubsRemoved, AcksRemoved) ->
{Out, PubsRemoved, AcksRemoved};
journal_minus_segment({Pub, del, ack},
- {Pub = {_MsgId, _IsPersistent}, no_del, no_ack},
+ {Pub = {_Guid, _IsPersistent}, no_del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, del, ack}, Out),
PubsRemoved + 1, AcksRemoved};
journal_minus_segment({Pub, del, ack},
- {Pub = {_MsgId, _IsPersistent}, del, no_ack},
+ {Pub = {_Guid, _IsPersistent}, del, no_ack},
RelSeq, Out, PubsRemoved, AcksRemoved) ->
{array:set(RelSeq, {no_pub, no_del, ack}, Out),
PubsRemoved + 1, AcksRemoved}.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 8eb129394d..66f2d3cc3f 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -1022,170 +1022,170 @@ stop_msg_store() ->
guid_bin(X) ->
erlang:md5(term_to_binary(X)).
-msg_store_contains(Atom, MsgIds) ->
+msg_store_contains(Atom, Guids) ->
Atom = lists:foldl(
- fun (MsgId, Atom1) when Atom1 =:= Atom ->
- rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId) end,
- Atom, MsgIds).
+ fun (Guid, Atom1) when Atom1 =:= Atom ->
+ rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end,
+ Atom, Guids).
-msg_store_sync(MsgIds) ->
+msg_store_sync(Guids) ->
Ref = make_ref(),
Self = self(),
- ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, MsgIds,
+ ok = rabbit_msg_store:sync(?PERSISTENT_MSG_STORE, Guids,
fun () -> Self ! {sync, Ref} end),
receive
{sync, Ref} -> ok
after
10000 ->
- io:format("Sync from msg_store missing for guids ~p~n", [MsgIds]),
+ io:format("Sync from msg_store missing for guids ~p~n", [Guids]),
throw(timeout)
end.
-msg_store_read(MsgIds, MSCState) ->
+msg_store_read(Guids, MSCState) ->
lists:foldl(
- fun (MsgId, MSCStateM) ->
- {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
- ?PERSISTENT_MSG_STORE, MsgId, MSCStateM),
+ fun (Guid, MSCStateM) ->
+ {{ok, Guid}, MSCStateN} = rabbit_msg_store:read(
+ ?PERSISTENT_MSG_STORE, Guid, MSCStateM),
MSCStateN
end,
- MSCState, MsgIds).
+ MSCState, Guids).
-msg_store_write(MsgIds, MSCState) ->
+msg_store_write(Guids, MSCState) ->
lists:foldl(
- fun (MsgId, {ok, MSCStateN}) ->
- rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, MsgId, MSCStateN) end,
- {ok, MSCState}, MsgIds).
+ fun (Guid, {ok, MSCStateN}) ->
+ rabbit_msg_store:write(?PERSISTENT_MSG_STORE, Guid, Guid, MSCStateN) end,
+ {ok, MSCState}, Guids).
test_msg_store() ->
stop_msg_store(),
ok = start_msg_store_empty(),
Self = self(),
- MsgIds = [guid_bin(M) || M <- lists:seq(1,100)],
- {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(50, MsgIds),
+ Guids = [guid_bin(M) || M <- lists:seq(1,100)],
+ {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids),
%% check we don't contain any of the msgs we're about to publish
- false = msg_store_contains(false, MsgIds),
+ false = msg_store_contains(false, Guids),
Ref = rabbit_guid:guid(),
MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
%% publish the first half
- {ok, MSCState1} = msg_store_write(MsgIds1stHalf, MSCState),
+ {ok, MSCState1} = msg_store_write(Guids1stHalf, MSCState),
%% sync on the first half
- ok = msg_store_sync(MsgIds1stHalf),
+ ok = msg_store_sync(Guids1stHalf),
%% publish the second half
- {ok, MSCState2} = msg_store_write(MsgIds2ndHalf, MSCState1),
+ {ok, MSCState2} = msg_store_write(Guids2ndHalf, MSCState1),
%% sync on the first half again - the msg_store will be dirty, but
%% we won't need the fsync
- ok = msg_store_sync(MsgIds1stHalf),
+ ok = msg_store_sync(Guids1stHalf),
%% check they're all in there
- true = msg_store_contains(true, MsgIds),
+ true = msg_store_contains(true, Guids),
%% publish the latter half twice so we hit the caching and ref count code
- {ok, MSCState3} = msg_store_write(MsgIds2ndHalf, MSCState2),
+ {ok, MSCState3} = msg_store_write(Guids2ndHalf, MSCState2),
%% check they're still all in there
- true = msg_store_contains(true, MsgIds),
+ true = msg_store_contains(true, Guids),
%% sync on the 2nd half, but do lots of individual syncs to try
%% and cause coalescing to happen
ok = lists:foldl(
- fun (MsgId, ok) -> rabbit_msg_store:sync(
+ fun (Guid, ok) -> rabbit_msg_store:sync(
?PERSISTENT_MSG_STORE,
- [MsgId], fun () -> Self ! {sync, MsgId} end)
- end, ok, MsgIds2ndHalf),
+ [Guid], fun () -> Self ! {sync, Guid} end)
+ end, ok, Guids2ndHalf),
lists:foldl(
- fun(MsgId, ok) ->
+ fun(Guid, ok) ->
receive
- {sync, MsgId} -> ok
+ {sync, Guid} -> ok
after
10000 ->
io:format("Sync from msg_store missing (guid: ~p)~n",
- [MsgId]),
+ [Guid]),
throw(timeout)
end
- end, ok, MsgIds2ndHalf),
+ end, ok, Guids2ndHalf),
%% it's very likely we're not dirty here, so the 1st half sync
%% should hit a different code path
- ok = msg_store_sync(MsgIds1stHalf),
+ ok = msg_store_sync(Guids1stHalf),
%% read them all
- MSCState4 = msg_store_read(MsgIds, MSCState3),
+ MSCState4 = msg_store_read(Guids, MSCState3),
%% read them all again - this will hit the cache, not disk
- MSCState5 = msg_store_read(MsgIds, MSCState4),
+ MSCState5 = msg_store_read(Guids, MSCState4),
%% remove them all
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds),
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids),
%% check first half doesn't exist
- false = msg_store_contains(false, MsgIds1stHalf),
+ false = msg_store_contains(false, Guids1stHalf),
%% check second half does exist
- true = msg_store_contains(true, MsgIds2ndHalf),
+ true = msg_store_contains(true, Guids2ndHalf),
%% read the second half again
- MSCState6 = msg_store_read(MsgIds2ndHalf, MSCState5),
+ MSCState6 = msg_store_read(Guids2ndHalf, MSCState5),
%% release the second half, just for fun (aka code coverage)
- ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, MsgIds2ndHalf),
+ ok = rabbit_msg_store:release(?PERSISTENT_MSG_STORE, Guids2ndHalf),
%% read the second half again, just for fun (aka code coverage)
- MSCState7 = msg_store_read(MsgIds2ndHalf, MSCState6),
+ MSCState7 = msg_store_read(Guids2ndHalf, MSCState6),
ok = rabbit_msg_store:client_terminate(MSCState7),
%% stop and restart, preserving every other msg in 2nd half
ok = stop_msg_store(),
ok = start_msg_store(fun ([]) -> finished;
- ([MsgId|MsgIdsTail])
- when length(MsgIdsTail) rem 2 == 0 ->
- {MsgId, 1, MsgIdsTail};
- ([MsgId|MsgIdsTail]) ->
- {MsgId, 0, MsgIdsTail}
- end, MsgIds2ndHalf),
+ ([Guid|GuidsTail])
+ when length(GuidsTail) rem 2 == 0 ->
+ {Guid, 1, GuidsTail};
+ ([Guid|GuidsTail]) ->
+ {Guid, 0, GuidsTail}
+ end, Guids2ndHalf),
%% check we have the right msgs left
lists:foldl(
- fun (MsgId, Bool) ->
- not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, MsgId))
- end, false, MsgIds2ndHalf),
+ fun (Guid, Bool) ->
+ not(Bool = rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid))
+ end, false, Guids2ndHalf),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(),
%% check we don't contain any of the msgs
- false = msg_store_contains(false, MsgIds),
+ false = msg_store_contains(false, Guids),
%% publish the first half again
MSCState8 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref),
- {ok, MSCState9} = msg_store_write(MsgIds1stHalf, MSCState8),
+ {ok, MSCState9} = msg_store_write(Guids1stHalf, MSCState8),
%% this should force some sort of sync internally otherwise misread
ok = rabbit_msg_store:client_terminate(
- msg_store_read(MsgIds1stHalf, MSCState9)),
- ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, MsgIds1stHalf),
+ msg_store_read(Guids1stHalf, MSCState9)),
+ ok = rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, Guids1stHalf),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(), %% now safe to reuse guids
%% push a lot of msgs in...
BigCount = 100000,
- MsgIdsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
+ GuidsBig = [guid_bin(X) || X <- lists:seq(1, BigCount)],
Payload = << 0:65536 >>,
ok = rabbit_msg_store:client_terminate(
lists:foldl(
- fun (MsgId, MSCStateN) ->
+ fun (Guid, MSCStateN) ->
{ok, MSCStateM} =
- rabbit_msg_store:write(?PERSISTENT_MSG_STORE, MsgId, Payload, MSCStateN),
+ rabbit_msg_store:write(?PERSISTENT_MSG_STORE, Guid, Payload, MSCStateN),
MSCStateM
- end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)),
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), GuidsBig)),
%% now read them to ensure we hit the fast client-side reading
ok = rabbit_msg_store:client_terminate(
lists:foldl(
- fun (MsgId, MSCStateM) ->
+ fun (Guid, MSCStateM) ->
{{ok, Payload}, MSCStateN} =
- rabbit_msg_store:read(?PERSISTENT_MSG_STORE, MsgId, MSCStateM),
+ rabbit_msg_store:read(?PERSISTENT_MSG_STORE, Guid, MSCStateM),
MSCStateN
- end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), MsgIdsBig)),
+ end, rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref), GuidsBig)),
%% .., then 3s by 1...
ok = lists:foldl(
- fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
+ fun (Guid, ok) ->
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)])
end, ok, lists:seq(BigCount, 1, -3)),
%% .., then remove 3s by 2, from the young end first. This hits
%% GC (under 50% good data left, but no empty files. Must GC).
ok = lists:foldl(
- fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
+ fun (Guid, ok) ->
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)])
end, ok, lists:seq(BigCount-1, 1, -3)),
%% .., then remove 3s by 3, from the young end first. This hits
%% GC...
ok = lists:foldl(
- fun (MsgId, ok) ->
- rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(MsgId)])
+ fun (Guid, ok) ->
+ rabbit_msg_store:remove(?PERSISTENT_MSG_STORE, [guid_bin(Guid)])
end, ok, lists:seq(BigCount-2, 1, -3)),
%% ensure empty
- false = msg_store_contains(false, MsgIdsBig),
+ false = msg_store_contains(false, GuidsBig),
%% restart empty
ok = stop_msg_store(),
ok = start_msg_store_empty(),
@@ -1211,13 +1211,13 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
end,
{A, B, MSCStateEnd} =
lists:foldl(
- fun (SeqId, {QiN, SeqIdsMsgIdsAcc, MSCStateN}) ->
+ fun (SeqId, {QiN, SeqIdsGuidsAcc, MSCStateN}) ->
Guid = rabbit_guid:guid(),
QiM = rabbit_queue_index:write_published(Guid, SeqId, Persistent,
QiN),
{ok, MSCStateM} = rabbit_msg_store:write(MsgStore, Guid,
Guid, MSCStateN),
- {QiM, [{SeqId, Guid} | SeqIdsMsgIdsAcc], MSCStateM}
+ {QiM, [{SeqId, Guid} | SeqIdsGuidsAcc], MSCStateM}
end, {Qi, [], rabbit_msg_store:client_init(MsgStore, Ref)}, SeqIds),
ok = rabbit_msg_store:delete_client(MsgStore, Ref),
ok = rabbit_msg_store:client_terminate(MSCStateEnd),
@@ -1235,8 +1235,8 @@ queue_index_flush_journal(Qi) ->
verify_read_with_published(_Delivered, _Persistent, [], _) ->
ok;
verify_read_with_published(Delivered, Persistent,
- [{MsgId, SeqId, Persistent, Delivered}|Read],
- [{SeqId, MsgId}|Published]) ->
+ [{Guid, SeqId, Persistent, Delivered}|Read],
+ [{SeqId, Guid}|Published]) ->
verify_read_with_published(Delivered, Persistent, Read, Published);
verify_read_with_published(_Delivered, _Persistent, _Read, _Published) ->
ko.
@@ -1251,12 +1251,12 @@ test_queue_index() ->
{0, _PRef, _TRef, _Terms, Qi0} = rabbit_queue_index:init(test_queue(), false),
{0, 0, Qi1} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi0),
- {Qi2, SeqIdsMsgIdsA} = queue_index_publish(SeqIdsA, false, Qi1),
+ {Qi2, SeqIdsGuidsA} = queue_index_publish(SeqIdsA, false, Qi1),
{0, SegmentSize, Qi3} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi2),
{ReadA, Qi4} = rabbit_queue_index:read_segment_entries(0, Qi3),
ok = verify_read_with_published(false, false, ReadA,
- lists:reverse(SeqIdsMsgIdsA)),
+ lists:reverse(SeqIdsGuidsA)),
%% call terminate twice to prove it's idempotent
_Qi5 = rabbit_queue_index:terminate([], rabbit_queue_index:terminate([], Qi4)),
ok = stop_msg_store(),
@@ -1265,12 +1265,12 @@ test_queue_index() ->
{0, _PRef1, _TRef1, _Terms1, Qi6} = rabbit_queue_index:init(test_queue(), false),
{0, 0, Qi7} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi6),
- {Qi8, SeqIdsMsgIdsB} = queue_index_publish(SeqIdsB, true, Qi7),
+ {Qi8, SeqIdsGuidsB} = queue_index_publish(SeqIdsB, true, Qi7),
{0, TwoSegs, Qi9} =
rabbit_queue_index:find_lowest_seq_id_seg_and_next_seq_id(Qi8),
{ReadB, Qi10} = rabbit_queue_index:read_segment_entries(0, Qi9),
ok = verify_read_with_published(false, true, ReadB,
- lists:reverse(SeqIdsMsgIdsB)),
+ lists:reverse(SeqIdsGuidsB)),
_Qi11 = rabbit_queue_index:terminate([], Qi10),
ok = stop_msg_store(),
ok = rabbit_queue_index:start_msg_stores([test_queue()]),
@@ -1282,7 +1282,7 @@ test_queue_index() ->
Qi14 = queue_index_deliver(SeqIdsB, Qi13),
{ReadC, Qi15} = rabbit_queue_index:read_segment_entries(0, Qi14),
ok = verify_read_with_published(true, true, ReadC,
- lists:reverse(SeqIdsMsgIdsB)),
+ lists:reverse(SeqIdsGuidsB)),
Qi16 = rabbit_queue_index:write_acks(SeqIdsB, Qi15),
Qi17 = queue_index_flush_journal(Qi16),
%% Everything will have gone now because #pubs == #acks
@@ -1302,20 +1302,20 @@ test_queue_index() ->
%% a) partial pub+del+ack, then move to new segment
SeqIdsC = lists:seq(0,trunc(SegmentSize/2)),
{0, _PRef4, _TRef4, _Terms4, Qi22} = rabbit_queue_index:init(test_queue(), false),
- {Qi23, _SeqIdsMsgIdsC} = queue_index_publish(SeqIdsC, false, Qi22),
+ {Qi23, _SeqIdsGuidsC} = queue_index_publish(SeqIdsC, false, Qi22),
Qi24 = queue_index_deliver(SeqIdsC, Qi23),
Qi25 = rabbit_queue_index:write_acks(SeqIdsC, Qi24),
Qi26 = queue_index_flush_journal(Qi25),
- {Qi27, _SeqIdsMsgIdsC1} = queue_index_publish([SegmentSize], false, Qi26),
+ {Qi27, _SeqIdsGuidsC1} = queue_index_publish([SegmentSize], false, Qi26),
_Qi28 = rabbit_queue_index:terminate_and_erase(Qi27),
ok = stop_msg_store(),
ok = empty_test_queue(),
%% b) partial pub+del, then move to new segment, then ack all in old segment
{0, _PRef5, _TRef5, _Terms5, Qi29} = rabbit_queue_index:init(test_queue(), false),
- {Qi30, _SeqIdsMsgIdsC2} = queue_index_publish(SeqIdsC, false, Qi29),
+ {Qi30, _SeqIdsGuidsC2} = queue_index_publish(SeqIdsC, false, Qi29),
Qi31 = queue_index_deliver(SeqIdsC, Qi30),
- {Qi32, _SeqIdsMsgIdsC3} = queue_index_publish([SegmentSize], false, Qi31),
+ {Qi32, _SeqIdsGuidsC3} = queue_index_publish([SegmentSize], false, Qi31),
Qi33 = rabbit_queue_index:write_acks(SeqIdsC, Qi32),
Qi34 = queue_index_flush_journal(Qi33),
_Qi35 = rabbit_queue_index:terminate_and_erase(Qi34),
@@ -1325,7 +1325,7 @@ test_queue_index() ->
%% c) just fill up several segments of all pubs, then +dels, then +acks
SeqIdsD = lists:seq(0,SegmentSize*4),
{0, _PRef6, _TRef6, _Terms6, Qi36} = rabbit_queue_index:init(test_queue(), false),
- {Qi37, _SeqIdsMsgIdsD} = queue_index_publish(SeqIdsD, false, Qi36),
+ {Qi37, _SeqIdsGuidsD} = queue_index_publish(SeqIdsD, false, Qi36),
Qi38 = queue_index_deliver(SeqIdsD, Qi37),
Qi39 = rabbit_queue_index:write_acks(SeqIdsD, Qi38),
Qi40 = queue_index_flush_journal(Qi39),
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index f2e9c19c56..164533b716 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -319,7 +319,7 @@ publish(Msg, State) ->
{_SeqId, State2} = publish(Msg, false, false, State1),
State2.
-publish_delivered(Msg = #basic_message { guid = MsgId,
+publish_delivered(Msg = #basic_message { guid = Guid,
is_persistent = IsPersistent },
State = #vqstate { len = 0, index_state = IndexState,
next_seq_id = SeqId,
@@ -331,7 +331,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
State1 = State #vqstate { out_counter = OutCount + 1,
in_counter = InCount + 1 },
MsgStatus = #msg_status {
- msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = true, msg_on_disk = false, index_on_disk = false },
{MsgStatus1, MSCState1} = maybe_write_msg_to_disk(PersistentStore, false,
MsgStatus, MSCState),
@@ -344,7 +344,7 @@ publish_delivered(Msg = #basic_message { guid = MsgId,
true ->
{#msg_status { index_on_disk = true }, IndexState1} =
maybe_write_index_to_disk(false, MsgStatus1, IndexState),
- {{ack_index_and_store, MsgId, SeqId,
+ {{ack_index_and_store, Guid, SeqId,
find_msg_store(IsPersistent, PersistentStore)},
State2 #vqstate { index_state = IndexState1,
next_seq_id = SeqId + 1 }};
@@ -411,7 +411,7 @@ fetch(State =
{empty, _Q4} ->
fetch_from_q3_or_delta(State);
{{value, #msg_status {
- msg = Msg, guid = MsgId, seq_id = SeqId,
+ msg = Msg, guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent, is_delivered = IsDelivered,
msg_on_disk = MsgOnDisk, index_on_disk = IndexOnDisk }},
Q4a} ->
@@ -436,11 +436,11 @@ fetch(State =
AckTag =
case IsPersistent of
true -> true = MsgOnDisk, %% ASSERTION
- {ack_index_and_store, MsgId, SeqId, MsgStore};
+ {ack_index_and_store, Guid, SeqId, MsgStore};
false -> ok = case MsgOnDisk of
true ->
rabbit_msg_store:remove(
- MsgStore, [MsgId]);
+ MsgStore, [Guid]);
false -> ok
end,
ack_not_on_disk
@@ -457,22 +457,22 @@ ack([], State) ->
ack(AckTags, State = #vqstate { index_state = IndexState,
persistent_count = PCount,
persistent_store = PersistentStore }) ->
- {MsgIdsByStore, SeqIds} =
+ {GuidsByStore, SeqIds} =
lists:foldl(
fun (ack_not_on_disk, Acc) -> Acc;
- ({ack_index_and_store, MsgId, SeqId, MsgStore}, {Dict, SeqIds}) ->
- {rabbit_misc:dict_cons(MsgStore, MsgId, Dict), [SeqId | SeqIds]}
+ ({ack_index_and_store, Guid, SeqId, MsgStore}, {Dict, SeqIds}) ->
+ {rabbit_misc:dict_cons(MsgStore, Guid, Dict), [SeqId | SeqIds]}
end, {dict:new(), []}, AckTags),
IndexState1 = case SeqIds of
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
- rabbit_msg_store:remove(MsgStore, MsgIds)
- end, ok, MsgIdsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
error -> 0;
- {ok, MsgIds} -> length(MsgIds)
+ {ok, Guids} -> length(Guids)
end,
State #vqstate { index_state = IndexState1, persistent_count = PCount1 }.
@@ -533,20 +533,20 @@ delete_and_terminate(State) ->
%% msg_store:release so that the cache isn't held full of msgs which
%% are now at the tail of the queue.
requeue(MsgsWithAckTags, State) ->
- {SeqIds, MsgIdsByStore,
+ {SeqIds, GuidsByStore,
State1 = #vqstate { index_state = IndexState,
persistent_count = PCount,
persistent_store = PersistentStore }} =
lists:foldl(
- fun ({Msg = #basic_message { guid = MsgId }, AckTag},
+ fun ({Msg = #basic_message { guid = Guid }, AckTag},
{SeqIdsAcc, Dict, StateN}) ->
{SeqIdsAcc1, Dict1, MsgOnDisk} =
case AckTag of
ack_not_on_disk ->
{SeqIdsAcc, Dict, false};
- {ack_index_and_store, MsgId, SeqId, MsgStore} ->
+ {ack_index_and_store, Guid, SeqId, MsgStore} ->
{[SeqId | SeqIdsAcc],
- rabbit_misc:dict_cons(MsgStore, MsgId, Dict),
+ rabbit_misc:dict_cons(MsgStore, Guid, Dict),
true}
end,
{_SeqId, StateN1} =
@@ -558,21 +558,21 @@ requeue(MsgsWithAckTags, State) ->
[] -> IndexState;
_ -> rabbit_queue_index:write_acks(SeqIds, IndexState)
end,
- ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
- rabbit_msg_store:release(MsgStore, MsgIds)
- end, ok, MsgIdsByStore),
- PCount1 = PCount - case dict:find(PersistentStore, MsgIdsByStore) of
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:release(MsgStore, Guids)
+ end, ok, GuidsByStore),
+ PCount1 = PCount - case dict:find(PersistentStore, GuidsByStore) of
error -> 0;
- {ok, MsgIds} -> length(MsgIds)
+ {ok, Guids} -> length(Guids)
end,
State1 #vqstate { index_state = IndexState1,
persistent_count = PCount1 }.
-tx_publish(Msg = #basic_message { is_persistent = true, guid = MsgId },
+tx_publish(Msg = #basic_message { is_persistent = true, guid = Guid },
State = #vqstate { msg_store_clients = MSCState,
persistent_store = PersistentStore }) ->
MsgStatus = #msg_status {
- msg = Msg, guid = MsgId, seq_id = undefined, is_persistent = true,
+ msg = Msg, guid = Guid, seq_id = undefined, is_persistent = true,
is_delivered = false, msg_on_disk = false, index_on_disk = false },
{#msg_status { msg_on_disk = true }, MSCState1} =
maybe_write_msg_to_disk(PersistentStore, false, MsgStatus, MSCState),
@@ -591,8 +591,8 @@ tx_commit(Pubs, AckTags, From, State =
#vqstate { persistent_store = PersistentStore }) ->
%% If we are a non-durable queue, or we have no persistent pubs,
%% we can skip the msg_store loop.
- PersistentMsgIds = persistent_guids(Pubs),
- IsTransientPubs = [] == PersistentMsgIds,
+ PersistentGuids = persistent_guids(Pubs),
+ IsTransientPubs = [] == PersistentGuids,
case IsTransientPubs orelse
?TRANSIENT_MSG_STORE == PersistentStore of
true ->
@@ -601,7 +601,7 @@ tx_commit(Pubs, AckTags, From, State =
false ->
Self = self(),
ok = rabbit_msg_store:sync(
- ?PERSISTENT_MSG_STORE, PersistentMsgIds,
+ ?PERSISTENT_MSG_STORE, PersistentGuids,
fun () -> ok = rabbit_amqqueue:maybe_run_queue_via_backing_queue(
Self,
fun (StateN) -> tx_commit_post_msg_store(
@@ -636,7 +636,7 @@ tx_commit_index(State = #vqstate { on_sync = {SAcks, SPubs, SFroms},
persistent_store = PersistentStore }) ->
Acks = lists:flatten(SAcks),
State1 = ack(Acks, State),
- AckSeqIds = lists:foldl(fun ({ack_index_and_store, _MsgId,
+ AckSeqIds = lists:foldl(fun ({ack_index_and_store, _Guid,
SeqId, ?PERSISTENT_MSG_STORE}, SeqIdsAcc) ->
[SeqId | SeqIdsAcc];
(_, SeqIdsAcc) ->
@@ -700,13 +700,13 @@ update_rate(Now, Then, Count, {OThen, OCount}) ->
{Avg, {Then, Count}}.
persistent_guids(Pubs) ->
- [MsgId || Obj = #basic_message { guid = MsgId } <- Pubs,
+ [Guid || Obj = #basic_message { guid = Guid } <- Pubs,
Obj #basic_message.is_persistent].
betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
{Filtered, IndexState1} =
lists:foldr(
- fun ({MsgId, SeqId, IsPersistent, IsDelivered},
+ fun ({Guid, SeqId, IsPersistent, IsDelivered},
{FilteredAcc, IndexStateAcc}) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true ->
@@ -722,7 +722,7 @@ betas_from_segment_entries(List, SeqIdLimit, TransientThreshold, IndexState) ->
case SeqId < SeqIdLimit of
true ->
{[#msg_status { msg = undefined,
- guid = MsgId,
+ guid = Guid,
seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered,
@@ -838,12 +838,12 @@ purge1(Count, State = #vqstate { q3 = Q3, index_state = IndexState,
end.
remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
- {_PersistentStore, Count, MsgIdsByStore, SeqIds, IndexState1} =
+ {_PersistentStore, Count, GuidsByStore, SeqIds, IndexState1} =
Fold(fun remove_queue_entries1/2,
{PersistentStore, 0, dict:new(), [], IndexState}, Q),
- ok = dict:fold(fun (MsgStore, MsgIds, ok) ->
- rabbit_msg_store:remove(MsgStore, MsgIds)
- end, ok, MsgIdsByStore),
+ ok = dict:fold(fun (MsgStore, Guids, ok) ->
+ rabbit_msg_store:remove(MsgStore, Guids)
+ end, ok, GuidsByStore),
IndexState2 =
case SeqIds of
[] -> IndexState1;
@@ -852,18 +852,18 @@ remove_queue_entries(PersistentStore, Fold, Q, IndexState) ->
{Count, IndexState2}.
remove_queue_entries1(
- #msg_status { guid = MsgId, seq_id = SeqId,
+ #msg_status { guid = Guid, seq_id = SeqId,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = IndexOnDisk, is_persistent = IsPersistent },
- {PersistentStore, CountN, MsgIdsByStore, SeqIdsAcc, IndexStateN}) ->
- MsgIdsByStore1 =
+ {PersistentStore, CountN, GuidsByStore, SeqIdsAcc, IndexStateN}) ->
+ GuidsByStore1 =
case {MsgOnDisk, IsPersistent} of
{true, true} ->
- rabbit_misc:dict_cons(PersistentStore, MsgId, MsgIdsByStore);
+ rabbit_misc:dict_cons(PersistentStore, Guid, GuidsByStore);
{true, false} ->
- rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, MsgId, MsgIdsByStore);
+ rabbit_misc:dict_cons(?TRANSIENT_MSG_STORE, Guid, GuidsByStore);
{false, _} ->
- MsgIdsByStore
+ GuidsByStore
end,
SeqIdsAcc1 = case IndexOnDisk of
true -> [SeqId | SeqIdsAcc];
@@ -874,7 +874,7 @@ remove_queue_entries1(
SeqId, IndexStateN);
false -> IndexStateN
end,
- {PersistentStore, CountN + 1, MsgIdsByStore1, SeqIdsAcc1, IndexStateN1}.
+ {PersistentStore, CountN + 1, GuidsByStore1, SeqIdsAcc1, IndexStateN1}.
fetch_from_q3_or_delta(State = #vqstate {
q1 = Q1, q2 = Q2, delta = #delta { count = DeltaCount },
@@ -889,12 +889,12 @@ fetch_from_q3_or_delta(State = #vqstate {
true = queue:is_empty(Q1), %% ASSERTION
{empty, State};
{{value, IndexOnDisk, MsgStatus = #msg_status {
- msg = undefined, guid = MsgId,
+ msg = undefined, guid = Guid,
is_persistent = IsPersistent }}, Q3a} ->
{{ok, Msg = #basic_message { is_persistent = IsPersistent,
- guid = MsgId }}, MSCState1} =
+ guid = Guid }}, MSCState1} =
read_from_msg_store(
- PersistentStore, MSCState, IsPersistent, MsgId),
+ PersistentStore, MSCState, IsPersistent, Guid),
Q4a = queue:in(MsgStatus #msg_status { msg = Msg }, Q4),
RamIndexCount1 = case IndexOnDisk of
true -> RamIndexCount;
@@ -978,12 +978,12 @@ test_keep_msg_in_ram(SeqId, #vqstate { target_ram_msg_count = TargetRamMsgCount,
end
end.
-publish(Msg = #basic_message { is_persistent = IsPersistent, guid = MsgId },
+publish(Msg = #basic_message { is_persistent = IsPersistent, guid = Guid },
IsDelivered, MsgOnDisk, State =
#vqstate { next_seq_id = SeqId, len = Len, in_counter = InCount,
persistent_count = PCount }) ->
MsgStatus = #msg_status {
- msg = Msg, guid = MsgId, seq_id = SeqId, is_persistent = IsPersistent,
+ msg = Msg, guid = Guid, seq_id = SeqId, is_persistent = IsPersistent,
is_delivered = IsDelivered, msg_on_disk = MsgOnDisk,
index_on_disk = false },
PCount1 = PCount + case IsPersistent of
@@ -1084,11 +1084,11 @@ with_msg_store_state(_PersistentStore, {MSCStateP, {MSCStateT, TRef}}, false,
{Result, MSCStateT1} = Fun(?TRANSIENT_MSG_STORE, MSCStateT),
{Result, {MSCStateP, {MSCStateT1, TRef}}}.
-read_from_msg_store(PersistentStore, MSCState, IsPersistent, MsgId) ->
+read_from_msg_store(PersistentStore, MSCState, IsPersistent, Guid) ->
with_msg_store_state(
PersistentStore, MSCState, IsPersistent,
fun (MsgStore, MSCState1) ->
- rabbit_msg_store:read(MsgStore, MsgId, MSCState1)
+ rabbit_msg_store:read(MsgStore, Guid, MSCState1)
end).
maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
@@ -1096,7 +1096,7 @@ maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus =
{MsgStatus, MSCState};
maybe_write_msg_to_disk(PersistentStore, Force,
MsgStatus = #msg_status {
- msg = Msg, guid = MsgId,
+ msg = Msg, guid = Guid,
is_persistent = IsPersistent }, MSCState)
when Force orelse IsPersistent ->
{ok, MSCState1} =
@@ -1104,7 +1104,7 @@ maybe_write_msg_to_disk(PersistentStore, Force,
PersistentStore, MSCState, IsPersistent,
fun (MsgStore, MSCState2) ->
rabbit_msg_store:write(
- MsgStore, MsgId, ensure_binary_properties(Msg), MSCState2)
+ MsgStore, Guid, ensure_binary_properties(Msg), MSCState2)
end),
{MsgStatus #msg_status { msg_on_disk = true }, MSCState1};
maybe_write_msg_to_disk(_PersistentStore, _Force, MsgStatus, MSCState) ->
@@ -1115,13 +1115,13 @@ maybe_write_index_to_disk(_Force, MsgStatus =
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
{MsgStatus, IndexState};
maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
- guid = MsgId, seq_id = SeqId,
+ guid = Guid, seq_id = SeqId,
is_persistent = IsPersistent,
is_delivered = IsDelivered }, IndexState)
when Force orelse IsPersistent ->
true = MsgStatus #msg_status.msg_on_disk, %% ASSERTION
IndexState1 = rabbit_queue_index:write_published(
- MsgId, SeqId, IsPersistent, IndexState),
+ Guid, SeqId, IsPersistent, IndexState),
{MsgStatus #msg_status { index_on_disk = true },
case IsDelivered of
true -> rabbit_queue_index:write_delivered(SeqId, IndexState1);