summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorLoïc Hoguin <lhoguin@vmware.com>2023-03-10 09:48:43 +0100
committerLoïc Hoguin <lhoguin@vmware.com>2023-03-14 16:15:54 +0100
commita77886a9316b86a22094caadb9ec69d8aa2ceaa9 (patch)
tree60cd1b253d76c01cb36df273ce552589944438d9
parentfac8c5df289175d538fd719661efa82737ccd0aa (diff)
downloadrabbitmq-server-git-lh-msg-store.tar.gz
WIP flying optimlh-msg-store
-rw-r--r--deps/rabbit/src/rabbit_msg_store.erl300
-rw-r--r--deps/rabbit/src/rabbit_variable_queue.erl16
-rw-r--r--deps/rabbit/test/backing_queue_SUITE.erl138
3 files changed, 330 insertions, 124 deletions
diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl
index 0784d80ba0..a030e47b71 100644
--- a/deps/rabbit/src/rabbit_msg_store.erl
+++ b/deps/rabbit/src/rabbit_msg_store.erl
@@ -12,7 +12,7 @@
-export([start_link/5, successfully_recovered_state/1,
client_init/3, client_terminate/1, client_delete_and_terminate/1,
client_ref/1,
- write/3, write_flow/3, read/2, read_many/2, contains/2, remove/2]).
+ write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]).
-export([set_maximum_since_use/2,
compact_file/2, truncate_file/4, delete_file/2]). %% internal
@@ -45,6 +45,47 @@
%% i.e. two pairs, so GC does not go idle when busy
-define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4).
+%% Flying events. They get added as things happen. The entry in the table
+%% may get removed after the server write or after the server remove. When
+%% the value is removed after a write, when the value is added again it
+%% will take the same value if the value was never removed.
+%%
+%% So the possible values are only:
+%% - 1: client write
+%% - 3: client and server write
+%% - 7: client and server wrote, client remove before entry could be deleted
+%%
+%% Values 1 and 7 indicate there is a message in flight.
+
+%% @todo
+%% We want to keep track of pending messages, we don't care about what the server has
+%% done with them (it will delete the object once it's done, if possible).
+%% So we only need two values: a write is in flight and a remove is in flight
+
+-define(FLYING_WRITE, 1). %% ets:insert_new? not really necessary but worth doing yes
+-define(FLYING_WRITE_DONE, 2). %% ets:update_counter followed by a tentative remove
+-define(FLYING_REMOVE, 4). %% ets:update_counter
+%% Useful states.
+-define(FLYING_IS_WRITTEN, ?FLYING_WRITE + ?FLYING_WRITE_DONE). %% Write was handled.
+-define(FLYING_IS_IGNORED, ?FLYING_WRITE + ?FLYING_REMOVE). %% Remove before write was handled.
+-define(FLYING_IS_REMOVED, ?FLYING_WRITE + ?FLYING_WRITE_DONE + ?FLYING_REMOVE). %% Remove.
+
+%% @todo OK so on write we can just insert_new and on remove we can just update_counter with a default.
+%% @todo On server write we lookup and then later we delete_object with value 3
+%% We could do a match of CRef + value 1 to get all flying messages to do multi-write
+%% if we wanted to, and group the confirms.
+%% @todo On server remove we lookup and then delete by key
+%%
+%% @todo What to do if a value is not as expected? Can this happen? Probably not?
+%% Maybe not on store crash but what about queue crash? It's possible that
+%% in that case we get a flying entry but not a message so we could leak
+%% that way...
+
+%% We keep track of flying messages for writes and removes. The idea is that
+%% when a remove comes in before we could process the write, we skip the
+%% write and send a publisher confirm immediately. We later skip the remove
+%% as well since we didn't process the write.
+
%%----------------------------------------------------------------------------
-record(msstate,
@@ -417,9 +458,9 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
--spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
+-spec write_flow(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
-write_flow(MsgId, Msg,
+write_flow(MsgRef, MsgId, Msg,
CState = #client_msstate {
server = Server,
credit_disc_bound = CreditDiscBound }) ->
@@ -428,11 +469,12 @@ write_flow(MsgId, Msg,
%% rabbit_variable_queue. We are accessing the
%% rabbit_amqqueue_process process dictionary.
credit_flow:send(Server, CreditDiscBound),
- client_write(MsgId, Msg, flow, CState).
+ client_write(MsgRef, MsgId, Msg, flow, CState).
--spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
+-spec write(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'.
-write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
+%% This function is only used by tests.
+write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CState).
-spec read(rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}.
@@ -452,6 +494,20 @@ read(MsgId,
{{ok, Msg}, CState}
end.
+%% write to cache
+%% cast {write,...}
+%% some other write makes the file roll over and delete everything from the cache NOPE NOT EVERYTHING IT DOESN'T DELETE IF ENTRY WASN'T WRITTEN YET (or ignored)
+%% read from cache: entry not there
+%% read from index: {write,...} not processed yet
+%% crash NOPE!!! SEE ABOVE
+
+%% So the not_found is impossible unless there's a bug.
+
+%% what we want: the cache shouldn't be completely wiped, only the entries that don't match the current file
+%% maybe use a timestamp to allow us to only remove what was fully written and set that timestamp in both cache entry and {write,...} but I don't think that is good enough
+%% nope. OK what else can be used. We can track what messages were truly written and only remove those from the cache?
+%% but doing that in the main process could be slow, maybe do that in the GC process?
+
-spec read_many([rabbit_types:msg_id()], client_msstate()) -> #{rabbit_types:msg_id() => msg()}.
%% We disable read_many when the index module is not ETS for the time being.
@@ -552,8 +608,12 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}).
-spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'.
remove([], _CState) -> ok;
-remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
- [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
+remove(MsgIds, CState = #client_msstate { flying_ets = FlyingEts,
+ client_ref = CRef }) ->
+ %% @todo OK we write 1 at a time but maybe we can avoid doing N ets updates for N messages...
+ %% @todo No we can't because an insert_new would insert nothing if even only one already exists...
+ %% If the entry was deleted we act as if it wasn't by using the right default.
+ [ets:update_counter(FlyingEts, {CRef, MsgRef}, ?FLYING_REMOVE, {'', ?FLYING_IS_WRITTEN}) || {MsgRef, _} <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).
-spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'.
@@ -571,13 +631,16 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
-client_write(MsgId, Msg, Flow,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+client_write(MsgRef, MsgId, Msg, Flow,
+ CState = #client_msstate { flying_ets = FlyingEts,
+ cur_file_cache_ets = CurFileCacheEts,
client_ref = CRef }) ->
file_handle_cache_stats:update(msg_store_write),
- ok = client_update_flying(+1, MsgId, CState),
+ %% We are guaranteed that the insert will succeed.
+ %% This is true even for queue crashes because CRef will change.
+ true = ets:insert_new(FlyingEts, {{CRef, MsgRef}, ?FLYING_WRITE}),
ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
- ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+ ok = server_cast(CState, {write, CRef, MsgRef, MsgId, Flow}).
%% We no longer check for whether the message's file is locked because we
%% rely on the fact that the file gets removed only when there are no
@@ -604,30 +667,35 @@ client_read3(#msg_location { msg_id = MsgId, file = File },
{{ok, Msg}, CState1}
end.
-client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
- client_ref = CRef }) ->
- Key = {MsgId, CRef},
- case ets:insert_new(FlyingEts, {Key, Diff}) of
- true -> ok;
- false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
- 0 -> ok;
- Diff -> ok;
- Err when Err >= 2 ->
- %% The message must be referenced twice in the queue
- %% index. There is a bug somewhere, but we don't want
- %% to take down anything just because of this. Let's
- %% process the message as if the copies were in
- %% different queues (fan-out).
- ok;
- Err -> throw({bad_flying_ets_update, Diff, Err, Key})
- catch error:badarg ->
- %% this is guaranteed to succeed since the
- %% server only removes and updates flying_ets
- %% entries; it never inserts them
- true = ets:insert_new(FlyingEts, {Key, Diff})
- end,
- ok
- end.
+%% @todo Can we merge the flying behavior with the cur_cache refc stuff?
+%% If a message is in cur_cache it will be refc so the first time
+%% the {write,...} makes it through we can check cur_cache instead?
+%% If value has 0 refc we don't write?
+%client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
+% client_ref = CRef }) ->
+% Key = {MsgId, CRef},
+% %% @todo Use ets:update_counter with a default value.
+% case ets:insert_new(FlyingEts, {Key, Diff}) of
+% true -> ok;
+% false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of
+% 0 -> ok;
+% Diff -> ok;
+% Err when Err >= 2 ->
+% %% The message must be referenced twice in the queue
+% %% index. There is a bug somewhere, but we don't want
+% %% to take down anything just because of this. Let's
+% %% process the message as if the copies were in
+% %% different queues (fan-out).
+% ok;
+% Err -> throw({bad_flying_ets_update, Diff, Err, Key})
+% catch error:badarg ->
+% %% this is guaranteed to succeed since the
+% %% server only removes and updates flying_ets
+% %% entries; it never inserts them
+% true = ets:insert_new(FlyingEts, {Key, Diff})
+% end,
+% ok
+% end.
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
@@ -810,10 +878,73 @@ handle_cast({client_delete, CRef},
State1 = State #msstate { clients = maps:remove(CRef, Clients) },
noreply(clear_client(CRef, State1));
-handle_cast({write, CRef, MsgId, Flow},
+%% @todo I think we don't need the flying table as long as we use the value from the cache instead.
+%% We only need to write the message if it wasn't already written in the current file (keep keys?)
+%% and the cache is > 0. We should NOT worry about messages in previous files (?) well we should
+%% because the index can only have a single value so writing to another file doesn't work.
+%% Can we just update_counter the index and if it doesn't exist then write?
+%%
+%% client: - write to ets cache OR increase refc if already exists (which one takes priority? write if not fan-out, refc if fan-out, but can't know yet so write)
+%% - send {write...}
+%% DON'T update_flying anymore! The refc is doing it for us
+%% how does the refc work? we probably should +2 the refc on client_write because it will be decreased by both client ack and write?
+%%
+%% server: - -1 the cache; if result is 0 then we don't need to write (it was already acked)
+%% HMMM but we can't figure things out like this if there's fan-out
+%% so if fan-out we just increase/decrease the index (assuming non-zero) like we do before,
+%% we will just not have the flying optimisation in that case?
+%%
+%%
+%% remove: how do we know it was not written? we just read from the cache and if value is 0... Nope
+%% we want to just -1 client-side and only tell the message store the message is gone if the value is 0 ideally or if the value isn't in the cache (of course)
+%%
+%%
+%% when should values be removed from the cache in that case? when the file rolls over I guess? I guess that's why flying and refc are separate
+
+
+
+
+%% we write to cache everytime we do a {write,...}
+%% we want to avoid writing to disk if not necessary
+
+%% write: to cache with +1 refc and +1 pending_write
+%% {write,...}: +0 refc -1 pending_write
+ %% if refc == 0 -> don't write
+ %% do we need to track pending_write at all?
+ %% what about confirms? maybe never try to do them early?
+%% remove: -1 refc
+
+
+
+
+%% write
+%% refc+1
+%% ignore
+%% confirm?
+
+
+
+
+handle_cast({write, CRef, MsgRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
+
+ %% @todo Figure out how multi-write would work out with Flow.
+ %% With noflow, no problem. With flow, we must ack for
+ %% each message we end up writing?
+ %%
+ %% @todo Should we send a message per write? Probably, but
+ %% the message should say "there is something to write"
+ %% and we will look at the ets table?
+
+ %% @todo How do we know which messages were handled and which weren't? update_flying?
+ %% Won't a multi-write prevent the flying (write/remove) optimisation?
+
+ %% @todo We don't want to multi-write we want to multi-confirm, so the update_flying
+ %% we do here we instead want to do it on multiple messages if possible. So
+ %% get all messages from CRef that would end up ignored and do them all at once.
+
case Flow of
flow -> {CPid, _} = maps:get(CRef, Clients),
%% We are going to process a message sent by the
@@ -823,7 +954,7 @@ handle_cast({write, CRef, MsgId, Flow},
noflow -> ok
end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
- case update_flying(-1, MsgId, CRef, State) of
+ case flying_write({CRef, MsgRef}, State) of
process ->
[{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId),
noreply(write_message(MsgId, Msg, CRef, State));
@@ -841,6 +972,10 @@ handle_cast({write, CRef, MsgId, Flow},
%% current file then the cache entry will be removed by
%% the normal logic for that in write_message/4 and
%% maybe_roll_to_new_file/2.
+ %% @todo OK I think this is the core of the issue with
+ %% the cache. There's got to be a better way that
+ %% allows keeping the cache while not preventing
+ %% fast writes.
case index_lookup(MsgId, State1) of
[#msg_location { file = File }]
when File == State1 #msstate.current_file ->
@@ -854,8 +989,8 @@ handle_cast({write, CRef, MsgId, Flow},
handle_cast({remove, CRef, MsgIds}, State) ->
{RemovedMsgIds, State1} =
lists:foldl(
- fun (MsgId, {Removed, State2}) ->
- case update_flying(+1, MsgId, CRef, State2) of
+ fun ({MsgRef, MsgId}, {Removed, State2}) ->
+ case flying_remove({CRef, MsgRef}, State2) of
process -> {[MsgId | Removed],
remove_message(MsgId, CRef, State2)};
ignore -> {Removed, State2}
@@ -1010,27 +1145,65 @@ internal_sync(State = #msstate { current_file_handle = CurHdl,
client_confirm(CRef, MsgIds, written, StateN)
end, State1, CGs).
-update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
- Key = {MsgId, CRef},
- NDiff = -Diff,
+%% ets:match({{_, CRef}, 0})
+%% Delete all these objects.
+%% Check if MsgId is in there. Hmm no we can't assume we process in that case...
+ %% OK we NEVER remove the entry before we look in the case of write. So for write we will always have MsgId in there if it has 0.
+ %% So if MsgId is in there we delete the objects and we ignore.
+ %% But we can't delete the objects randomly since the counter may get updated we have to delete the objects we found explicitly (and only if they still say 0).
+%% OK but how do we avoid sending confirms multiple times then? The write message is coming anyway...
+ %% Hmmm....
+ %% We need to get rid of gen_server2? Probably too hard right now.
+ %% We could keep track of which MsgIds were already processed and ignore the message in that case.
+ %% Or since we delete_object we could just update_flying like before? But if the message was
+ %% already processed we will ignore it and we don't want to send a confirm again... Tough.
+%% (ignore) But we can do the ets:lookup and then if it's an ignore we match for the other ones?
+%% (ignore) But we'll have MORE ets operations in that case because we still have incoming write/remove...
+
+flying_write(Key, #msstate { flying_ets = FlyingEts }) ->
case ets:lookup(FlyingEts, Key) of
- [] -> ignore;
- [{_, Diff}] -> ignore; %% [1]
- [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
- true = ets:delete_object(FlyingEts, {Key, 0}),
- process;
- [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
- ignore;
- [{_, Err}] when Err >= 2 ->
- %% The message must be referenced twice in the queue index. There
- %% is a bug somewhere, but we don't want to take down anything
- %% just because of this. Let's process the message as if the
- %% copies were in different queues (fan-out).
- ets:update_counter(FlyingEts, Key, {2, Diff}),
- true = ets:delete_object(FlyingEts, {Key, 0}),
+ [{_, ?FLYING_WRITE}] ->
+ ets:update_counter(FlyingEts, Key, ?FLYING_WRITE_DONE),
+ %% We only remove the object if it hasn't changed
+ %% (a remove may be sent while we were processing the write).
+ true = ets:delete_object(FlyingEts, {Key, ?FLYING_IS_WRITTEN}),
process;
- [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key})
+ [{_, ?FLYING_IS_IGNORED}] ->
+ ignore
end.
+
+flying_remove(Key, #msstate { flying_ets = FlyingEts }) ->
+ Res = case ets:lookup(FlyingEts, Key) of
+ [{_, ?FLYING_IS_REMOVED}] ->
+ process;
+ [{_, ?FLYING_IS_IGNORED}] ->
+ ignore
+ end,
+ %% We are done with this message, we can unconditionally delete the entry.
+ true = ets:delete(FlyingEts, Key),
+ Res.
+
+%update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
+% Key = {MsgId, CRef},
+% NDiff = -Diff,
+% case ets:lookup(FlyingEts, Key) of
+% [] -> ignore; %% @todo How is that possible?! If we have a message we must have an entry...
+% [{_, Diff}] -> ignore; %% [1]
+% [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}),
+% true = ets:delete_object(FlyingEts, {Key, 0}),
+% process;
+% [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}),
+% ignore;
+% [{_, Err}] when Err >= 2 ->
+% %% The message must be referenced twice in the queue index. There
+% %% is a bug somewhere, but we don't want to take down anything
+% %% just because of this. Let's process the message as if the
+% %% copies were in different queues (fan-out).
+% ets:update_counter(FlyingEts, Key, {2, Diff}),
+% true = ets:delete_object(FlyingEts, {Key, 0}),
+% process;
+% [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key})
+% end.
%% [1] We can get here, for example, in the following scenario: There
%% is a write followed by a remove in flight. The counter will be 0,
%% so on processing the write the server attempts to delete the
@@ -1038,6 +1211,7 @@ update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) ->
%% either insert a new entry, containing +1, or increment the existing
%% entry to +1, thus preventing its removal. Either way therefore when
%% the server processes the read, the counter will be +1.
+%% @todo But why would the client insert the same MsgId twice?
%% The general idea is that when a client of a transient message store is dying,
%% we want to avoid writing messages that would end up being removed immediately
@@ -1050,6 +1224,10 @@ write_action({false, not_found}, _MsgId, State) ->
{write, State};
%% @todo This clause is probably fairly costly when large fan-out is used
%% with both producers and consumers more or less catched up.
+%% We probably should only increment the refcount for the current file?
+%% We know it's not locked, it has a valid total size, etc.
+%% We have to look up the index anyway so we know the file
+%% and can decide without querying file summary.
write_action({Mask, #msg_location { ref_count = 0, file = File,
total_size = TotalSize }},
MsgId, State = #msstate { current_file = CurrentFile,
@@ -1086,6 +1264,9 @@ write_message(MsgId, Msg, CRef,
{write, State1} ->
write_message(MsgId, Msg,
record_pending_confirm(CRef, MsgId, State1));
+ %% @todo Where does the confirm gets sent?
+ %% They aren't because those messages will not get written or confirmed
+ %% because the queue that sent the messages is shutting down.
{ignore, CurFile, State1 = #msstate { current_file = CurFile }} ->
State1;
{ignore, _File, State1} ->
@@ -1600,6 +1781,9 @@ maybe_roll_to_new_file(
valid_total_size = 0,
file_size = 0,
locked = false }),
+ %% @todo We only delete those that have no reference???
+ %% Does that mean the cache can grow unbounded? No because we decrease the reference on {write,...}
+ %% But we do get potential memory issues if the store is hammered.
true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}),
State1 #msstate { current_file_handle = NextHdl,
current_file = NextFile,
diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl
index 36c9c68e00..896303bb0a 100644
--- a/deps/rabbit/src/rabbit_variable_queue.erl
+++ b/deps/rabbit/src/rabbit_variable_queue.erl
@@ -651,7 +651,7 @@ ack([SeqId], State) ->
false -> {[], IndexState}
end,
StoreState1 = case MsgLocation of
- ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]), StoreState0;
+ ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [{SeqId, MsgId}]), StoreState0;
?IN_QUEUE_STORE -> rabbit_classic_queue_store_v2:remove(SeqId, StoreState0);
?IN_QUEUE_INDEX -> StoreState0;
?IN_MEMORY -> StoreState0
@@ -1348,11 +1348,11 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, VHost) ->
rabbit_vhost_msg_store:client_init(VHost, MsgStore,
Ref, MsgOnDiskFun).
-msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
+msg_store_write(MSCState, IsPersistent, SeqId, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
fun (MSCState1) ->
- rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ rabbit_msg_store:write_flow(SeqId, MsgId, Msg, MSCState1)
end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
@@ -1706,7 +1706,7 @@ remove(false, MsgStatus = #msg_status {
%% Remove from msg_store and queue index, if necessary
StoreState1 = case MsgLocation of
- ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]), StoreState0;
+ ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [{SeqId, MsgId}]), StoreState0;
?IN_QUEUE_STORE -> rabbit_classic_queue_store_v2:remove(SeqId, StoreState0);
?IN_QUEUE_INDEX -> StoreState0;
?IN_MEMORY -> StoreState0
@@ -1898,7 +1898,7 @@ remove_queue_entries1(
is_persistent = IsPersistent} = MsgStatus,
{MsgIdsByStore, NextDeliverSeqId, Acks, State}) ->
{case MsgLocation of
- ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
+ ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, {SeqId, MsgId}, MsgIdsByStore);
_ -> MsgIdsByStore
end,
next_deliver_seq_id(SeqId, NextDeliverSeqId),
@@ -2038,7 +2038,7 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
disk_write_count = Count})
when Force orelse IsPersistent ->
case persist_to(MsgStatus) of
- msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, SeqId, MsgId,
prepare_to_store(Msg)),
{MsgStatus#msg_status{msg_location = ?IN_SHARED_STORE},
State#vqstate{disk_write_count = Count + 1}};
@@ -2318,7 +2318,7 @@ accumulate_ack(#msg_status { seq_id = SeqId,
{IndexOnDiskSeqIdsAcc, MsgIdsByStore, SeqIdsInStore, AllMsgIds}) ->
{cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
case MsgLocation of
- ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore);
+ ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, {SeqId, MsgId}, MsgIdsByStore);
_ -> MsgIdsByStore
end,
case MsgLocation of
@@ -2350,6 +2350,8 @@ sets_subtract(Set1, Set2) ->
end.
msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
+ %% @todo Why does this behave like when msgs AND indices are written? indices may not be written yet here?
+ %% Right that's because the queue already acked it so it doesn't matter whether it's written to index.
Callback(?MODULE,
fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
msgs_written_to_disk(Callback, MsgIdSet, written) ->
diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl
index 31b8cff538..ec35fe012f 100644
--- a/deps/rabbit/test/backing_queue_SUITE.erl
+++ b/deps/rabbit/test/backing_queue_SUITE.erl
@@ -220,8 +220,12 @@ msg_store(Config) ->
?MODULE, msg_store1, [Config]).
msg_store1(_Config) ->
+ %% We simulate the SeqId (used as a message ref for the flying optimisation)
+ %% using the process dictionary.
+ GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end,
+ GenRef = fun() -> GenRefFun(msc) end,
restart_msg_store_empty(),
- MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)],
+ MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1,100)],
{MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds),
Ref = rabbit_guid:gen(),
{Cap, MSCState} = msg_store_client_init_capture(
@@ -232,7 +236,7 @@ msg_store1(_Config) ->
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
%% test confirm logic
- passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState),
+ passed = test_msg_store_confirms([hd(MsgIds)], Cap, GenRef, MSCState),
%% check we don't contain any of the msgs we're about to publish
false = msg_store_contains(false, MsgIds, MSCState),
%% publish the first half
@@ -247,7 +251,7 @@ msg_store1(_Config) ->
%% count code. We need to do this through a 2nd client since a
%% single client is not supposed to write the same message more
%% than once without first removing it.
- ok = msg_store_write(MsgIds2ndHalf, MSC2State),
+ ok = msg_store_write([{GenRefFun(msc2), MsgId} || {_, MsgId} <- MsgIds2ndHalf], MSC2State),
%% check they're still all in there
true = msg_store_contains(true, MsgIds, MSCState),
%% sync on the 2nd half
@@ -275,16 +279,16 @@ msg_store1(_Config) ->
ok = rabbit_variable_queue:stop_msg_store(?VHOST),
ok = rabbit_variable_queue:start_msg_store(?VHOST,
[], {fun ([]) -> finished;
- ([MsgId|MsgIdsTail])
+ ([{_, MsgId}|MsgIdsTail])
when length(MsgIdsTail) rem 2 == 0 ->
{MsgId, 1, MsgIdsTail};
- ([MsgId|MsgIdsTail]) ->
+ ([{_, MsgId}|MsgIdsTail]) ->
{MsgId, 0, MsgIdsTail}
end, MsgIds2ndHalf}),
MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
%% check we have the right msgs left
lists:foldl(
- fun (MsgId, Bool) ->
+ fun ({_, MsgId}, Bool) ->
not(Bool = rabbit_msg_store:contains(MsgId, MSCState5))
end, false, MsgIds2ndHalf),
ok = rabbit_msg_store:client_terminate(MSCState5),
@@ -307,34 +311,41 @@ msg_store1(_Config) ->
{ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit),
PayloadSizeBits = 65536,
BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)),
- MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)],
+ MsgIdsBig = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, BigCount)],
Payload = << 0:PayloadSizeBits >>,
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
fun (MSCStateM) ->
- [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) ||
- MsgId <- MsgIdsBig],
+ [ok = rabbit_msg_store:write(SeqId, MsgId, Payload, MSCStateM) ||
+ {SeqId, MsgId} <- MsgIdsBig],
MSCStateM
end),
%% now read them to ensure we hit the fast client-side reading
ok = foreach_with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
- fun (MsgId, MSCStateM) ->
+ fun ({_, MsgId}, MSCStateM) ->
{{ok, Payload}, MSCStateN} = rabbit_msg_store:read(
MsgId, MSCStateM),
MSCStateN
end, MsgIdsBig),
- %% .., then 3s by 1...
- ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]),
- %% .., then remove 3s by 2, from the young end first. This hits
- %% GC (under 50% good data left, but no empty files. Must GC).
- ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]),
- %% .., then remove 3s by 3, from the young end first. This hits
- %% GC...
- ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref,
- [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]),
+ %% We remove every other other message first, then do it again a second
+ %% time with another set of messages and then a third time. We start
+ %% with younger messages on purpose. So we split the list in three
+ %% lists keeping the message reference.
+ Part = fun
+ PartFun([], _, Acc) ->
+ Acc;
+ PartFun([E|Tail], N, Acc) ->
+ Pos = 1 + (N rem 3),
+ AccL = element(Pos, Acc),
+ PartFun(Tail, N + 1, setelement(Pos, Acc, [E|AccL]))
+ end,
+ {One, Two, Three} = Part(MsgIdsBig, 0, {[], [], []}),
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, One),
+ %% This is likely to hit GC (under 50% good data left in files, but no empty files).
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Two),
+ %% Files are empty now and will get removed.
+ ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Three),
%% ensure empty
ok = with_msg_store_client(
?PERSISTENT_MSG_STORE, Ref,
@@ -343,7 +354,7 @@ msg_store1(_Config) ->
MSCStateM
end),
%%
- passed = test_msg_store_client_delete_and_terminate(),
+ passed = test_msg_store_client_delete_and_terminate(fun() -> GenRefFun(msc_cdat) end),
%% restart empty
restart_msg_store_empty(),
passed.
@@ -379,7 +390,8 @@ on_disk_capture(OnDisk, Awaiting, Pid) ->
end
end.
-on_disk_await(Pid, MsgIds) when is_list(MsgIds) ->
+on_disk_await(Pid, MsgIds0) when is_list(MsgIds0) ->
+ {_, MsgIds} = lists:unzip(MsgIds0),
Pid ! {await, MsgIds, self()},
receive
{Pid, arrived} -> ok;
@@ -402,25 +414,25 @@ msg_store_client_init_capture(MsgStore, Ref) ->
msg_store_contains(Atom, MsgIds, MSCState) ->
Atom = lists:foldl(
- fun (MsgId, Atom1) when Atom1 =:= Atom ->
+ fun ({_, MsgId}, Atom1) when Atom1 =:= Atom ->
rabbit_msg_store:contains(MsgId, MSCState) end,
Atom, MsgIds).
msg_store_read(MsgIds, MSCState) ->
- lists:foldl(fun (MsgId, MSCStateM) ->
+ lists:foldl(fun ({_, MsgId}, MSCStateM) ->
{{ok, MsgId}, MSCStateN} = rabbit_msg_store:read(
MsgId, MSCStateM),
MSCStateN
end, MSCState, MsgIds).
msg_store_write(MsgIds, MSCState) ->
- ok = lists:foldl(fun (MsgId, ok) ->
- rabbit_msg_store:write(MsgId, MsgId, MSCState)
+ ok = lists:foldl(fun ({SeqId, MsgId}, ok) ->
+ rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState)
end, ok, MsgIds).
msg_store_write_flow(MsgIds, MSCState) ->
- ok = lists:foldl(fun (MsgId, ok) ->
- rabbit_msg_store:write_flow(MsgId, MsgId, MSCState)
+ ok = lists:foldl(fun ({SeqId, MsgId}, ok) ->
+ rabbit_msg_store:write_flow(SeqId, MsgId, MsgId, MSCState)
end, ok, MsgIds).
msg_store_remove(MsgIds, MSCState) ->
@@ -442,36 +454,42 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) ->
lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end,
msg_store_client_init(MsgStore, Ref), L)).
-test_msg_store_confirms(MsgIds, Cap, MSCState) ->
+test_msg_store_confirms(MsgIds, Cap, GenRef, MSCState) ->
%% write -> confirmed
- ok = msg_store_write(MsgIds, MSCState),
- ok = on_disk_await(Cap, MsgIds),
+ MsgIds1 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds1, MSCState),
+ ok = on_disk_await(Cap, MsgIds1),
%% remove -> _
- ok = msg_store_remove(MsgIds, MSCState),
+ ok = msg_store_remove(MsgIds1, MSCState),
ok = on_disk_await(Cap, []),
%% write, remove -> confirmed
- ok = msg_store_write(MsgIds, MSCState),
- ok = msg_store_remove(MsgIds, MSCState),
- ok = on_disk_await(Cap, MsgIds),
+ MsgIds2 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds2, MSCState),
+ ok = msg_store_remove(MsgIds2, MSCState),
+ ok = on_disk_await(Cap, MsgIds2),
%% write, remove, write -> confirmed, confirmed
- ok = msg_store_write(MsgIds, MSCState),
- ok = msg_store_remove(MsgIds, MSCState),
- ok = msg_store_write(MsgIds, MSCState),
- ok = on_disk_await(Cap, MsgIds ++ MsgIds),
+ MsgIds3 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds3, MSCState),
+ ok = msg_store_remove(MsgIds3, MSCState),
+ MsgIds4 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds4, MSCState),
+ ok = on_disk_await(Cap, MsgIds3 ++ MsgIds4),
%% remove, write -> confirmed
- ok = msg_store_remove(MsgIds, MSCState),
- ok = msg_store_write(MsgIds, MSCState),
- ok = on_disk_await(Cap, MsgIds),
+ ok = msg_store_remove(MsgIds4, MSCState),
+ MsgIds5 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds5, MSCState),
+ ok = on_disk_await(Cap, MsgIds5),
%% remove, write, remove -> confirmed
- ok = msg_store_remove(MsgIds, MSCState),
- ok = msg_store_write(MsgIds, MSCState),
- ok = msg_store_remove(MsgIds, MSCState),
- ok = on_disk_await(Cap, MsgIds),
+ ok = msg_store_remove(MsgIds5, MSCState),
+ MsgIds6 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds],
+ ok = msg_store_write(MsgIds6, MSCState),
+ ok = msg_store_remove(MsgIds6, MSCState),
+ ok = on_disk_await(Cap, MsgIds6),
%% confirmation on timer-based sync
- passed = test_msg_store_confirm_timer(),
+ passed = test_msg_store_confirm_timer(GenRef),
passed.
-test_msg_store_confirm_timer() ->
+test_msg_store_confirm_timer(GenRef) ->
Ref = rabbit_guid:gen(),
MsgId = msg_id_bin(1),
Self = self(),
@@ -485,32 +503,34 @@ test_msg_store_confirm_timer() ->
false -> ok
end
end),
- ok = msg_store_write([MsgId], MSCState),
- ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false),
- ok = msg_store_remove([MsgId], MSCState),
+ MsgIdsChecked = [{GenRef(), MsgId}],
+ ok = msg_store_write(MsgIdsChecked, MSCState),
+ ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], GenRef, MSCState, false),
+ ok = msg_store_remove(MsgIdsChecked, MSCState),
ok = rabbit_msg_store:client_delete_and_terminate(MSCState),
passed.
-msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) ->
+msg_store_keep_busy_until_confirm(MsgIds, GenRef, MSCState, Blocked) ->
After = case Blocked of
false -> 0;
true -> ?MAX_WAIT
end,
Recurse = fun () -> msg_store_keep_busy_until_confirm(
- MsgIds, MSCState, credit_flow:blocked()) end,
+ MsgIds, GenRef, MSCState, credit_flow:blocked()) end,
receive
on_disk -> ok;
{bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg),
Recurse()
after After ->
- ok = msg_store_write_flow(MsgIds, MSCState),
- ok = msg_store_remove(MsgIds, MSCState),
+ MsgIds1 = [{GenRef(), MsgId} || MsgId <- MsgIds],
+ ok = msg_store_write_flow(MsgIds1, MSCState),
+ ok = msg_store_remove(MsgIds1, MSCState),
Recurse()
end.
-test_msg_store_client_delete_and_terminate() ->
+test_msg_store_client_delete_and_terminate(GenRef) ->
restart_msg_store_empty(),
- MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)],
+ MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 10)],
Ref = rabbit_guid:gen(),
MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref),
ok = msg_store_write(MsgIds, MSCState),
@@ -1562,7 +1582,7 @@ queue_index_publish(SeqIds, Persistent, Qi) ->
MsgId, SeqId, rabbit_msg_store,
#message_properties{size = 10},
Persistent, infinity, QiN),
- ok = rabbit_msg_store:write(MsgId, MsgId, MSCState),
+ ok = rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState),
{QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]}
end, {Qi, []}, SeqIds),
%% do this just to force all of the publishes through to the msg_store: