diff options
author | Loïc Hoguin <lhoguin@vmware.com> | 2023-03-10 09:48:43 +0100 |
---|---|---|
committer | Loïc Hoguin <lhoguin@vmware.com> | 2023-03-14 16:15:54 +0100 |
commit | a77886a9316b86a22094caadb9ec69d8aa2ceaa9 (patch) | |
tree | 60cd1b253d76c01cb36df273ce552589944438d9 | |
parent | fac8c5df289175d538fd719661efa82737ccd0aa (diff) | |
download | rabbitmq-server-git-lh-msg-store.tar.gz |
WIP flying optimlh-msg-store
-rw-r--r-- | deps/rabbit/src/rabbit_msg_store.erl | 300 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_variable_queue.erl | 16 | ||||
-rw-r--r-- | deps/rabbit/test/backing_queue_SUITE.erl | 138 |
3 files changed, 330 insertions, 124 deletions
diff --git a/deps/rabbit/src/rabbit_msg_store.erl b/deps/rabbit/src/rabbit_msg_store.erl index 0784d80ba0..a030e47b71 100644 --- a/deps/rabbit/src/rabbit_msg_store.erl +++ b/deps/rabbit/src/rabbit_msg_store.erl @@ -12,7 +12,7 @@ -export([start_link/5, successfully_recovered_state/1, client_init/3, client_terminate/1, client_delete_and_terminate/1, client_ref/1, - write/3, write_flow/3, read/2, read_many/2, contains/2, remove/2]). + write/4, write_flow/4, read/2, read_many/2, contains/2, remove/2]). -export([set_maximum_since_use/2, compact_file/2, truncate_file/4, delete_file/2]). %% internal @@ -45,6 +45,47 @@ %% i.e. two pairs, so GC does not go idle when busy -define(MAXIMUM_SIMULTANEOUS_GC_FILES, 4). +%% Flying events. They get added as things happen. The entry in the table +%% may get removed after the server write or after the server remove. When +%% the value is removed after a write, when the value is added again it +%% will take the same value if the value was never removed. +%% +%% So the possible values are only: +%% - 1: client write +%% - 3: client and server write +%% - 7: client and server wrote, client remove before entry could be deleted +%% +%% Values 1 and 7 indicate there is a message in flight. + +%% @todo +%% We want to keep track of pending messages, we don't care about what the server has +%% done with them (it will delete the object once it's done, if possible). +%% So we only need two values: a write is in flight and a remove is in flight + +-define(FLYING_WRITE, 1). %% ets:insert_new? not really necessary but worth doing yes +-define(FLYING_WRITE_DONE, 2). %% ets:update_counter followed by a tentative remove +-define(FLYING_REMOVE, 4). %% ets:update_counter +%% Useful states. +-define(FLYING_IS_WRITTEN, ?FLYING_WRITE + ?FLYING_WRITE_DONE). %% Write was handled. +-define(FLYING_IS_IGNORED, ?FLYING_WRITE + ?FLYING_REMOVE). %% Remove before write was handled. +-define(FLYING_IS_REMOVED, ?FLYING_WRITE + ?FLYING_WRITE_DONE + ?FLYING_REMOVE). %% Remove. + +%% @todo OK so on write we can just insert_new and on remove we can just update_counter with a default. +%% @todo On server write we lookup and then later we delete_object with value 3 +%% We could do a match of CRef + value 1 to get all flying messages to do multi-write +%% if we wanted to, and group the confirms. +%% @todo On server remove we lookup and then delete by key +%% +%% @todo What to do if a value is not as expected? Can this happen? Probably not? +%% Maybe not on store crash but what about queue crash? It's possible that +%% in that case we get a flying entry but not a message so we could leak +%% that way... + +%% We keep track of flying messages for writes and removes. The idea is that +%% when a remove comes in before we could process the write, we skip the +%% write and send a publisher confirm immediately. We later skip the remove +%% as well since we didn't process the write. + %%---------------------------------------------------------------------------- -record(msstate, @@ -417,9 +458,9 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. --spec write_flow(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. +-spec write_flow(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. -write_flow(MsgId, Msg, +write_flow(MsgRef, MsgId, Msg, CState = #client_msstate { server = Server, credit_disc_bound = CreditDiscBound }) -> @@ -428,11 +469,12 @@ write_flow(MsgId, Msg, %% rabbit_variable_queue. We are accessing the %% rabbit_amqqueue_process process dictionary. credit_flow:send(Server, CreditDiscBound), - client_write(MsgId, Msg, flow, CState). + client_write(MsgRef, MsgId, Msg, flow, CState). --spec write(rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. +-spec write(any(), rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'. -write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). +%% This function is only used by tests. +write(MsgRef, MsgId, Msg, CState) -> client_write(MsgRef, MsgId, Msg, noflow, CState). -spec read(rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}. @@ -452,6 +494,20 @@ read(MsgId, {{ok, Msg}, CState} end. +%% write to cache +%% cast {write,...} +%% some other write makes the file roll over and delete everything from the cache NOPE NOT EVERYTHING IT DOESN'T DELETE IF ENTRY WASN'T WRITTEN YET (or ignored) +%% read from cache: entry not there +%% read from index: {write,...} not processed yet +%% crash NOPE!!! SEE ABOVE + +%% So the not_found is impossible unless there's a bug. + +%% what we want: the cache shouldn't be completely wiped, only the entries that don't match the current file +%% maybe use a timestamp to allow us to only remove what was fully written and set that timestamp in both cache entry and {write,...} but I don't think that is good enough +%% nope. OK what else can be used. We can track what messages were truly written and only remove those from the cache? +%% but doing that in the main process could be slow, maybe do that in the GC process? + -spec read_many([rabbit_types:msg_id()], client_msstate()) -> #{rabbit_types:msg_id() => msg()}. %% We disable read_many when the index module is not ETS for the time being. @@ -552,8 +608,12 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). -spec remove([rabbit_types:msg_id()], client_msstate()) -> 'ok'. remove([], _CState) -> ok; -remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> - [client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds], +remove(MsgIds, CState = #client_msstate { flying_ets = FlyingEts, + client_ref = CRef }) -> + %% @todo OK we write 1 at a time but maybe we can avoid doing N ets updates for N messages... + %% @todo No we can't because an insert_new would insert nothing if even only one already exists... + %% If the entry was deleted we act as if it wasn't by using the right default. + [ets:update_counter(FlyingEts, {CRef, MsgRef}, ?FLYING_REMOVE, {'', ?FLYING_IS_WRITTEN}) || {MsgRef, _} <- MsgIds], server_cast(CState, {remove, CRef, MsgIds}). -spec set_maximum_since_use(server(), non_neg_integer()) -> 'ok'. @@ -571,13 +631,16 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). -client_write(MsgId, Msg, Flow, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, +client_write(MsgRef, MsgId, Msg, Flow, + CState = #client_msstate { flying_ets = FlyingEts, + cur_file_cache_ets = CurFileCacheEts, client_ref = CRef }) -> file_handle_cache_stats:update(msg_store_write), - ok = client_update_flying(+1, MsgId, CState), + %% We are guaranteed that the insert will succeed. + %% This is true even for queue crashes because CRef will change. + true = ets:insert_new(FlyingEts, {{CRef, MsgRef}, ?FLYING_WRITE}), ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), - ok = server_cast(CState, {write, CRef, MsgId, Flow}). + ok = server_cast(CState, {write, CRef, MsgRef, MsgId, Flow}). %% We no longer check for whether the message's file is locked because we %% rely on the fact that the file gets removed only when there are no @@ -604,30 +667,35 @@ client_read3(#msg_location { msg_id = MsgId, file = File }, {{ok, Msg}, CState1} end. -client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, - client_ref = CRef }) -> - Key = {MsgId, CRef}, - case ets:insert_new(FlyingEts, {Key, Diff}) of - true -> ok; - false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of - 0 -> ok; - Diff -> ok; - Err when Err >= 2 -> - %% The message must be referenced twice in the queue - %% index. There is a bug somewhere, but we don't want - %% to take down anything just because of this. Let's - %% process the message as if the copies were in - %% different queues (fan-out). - ok; - Err -> throw({bad_flying_ets_update, Diff, Err, Key}) - catch error:badarg -> - %% this is guaranteed to succeed since the - %% server only removes and updates flying_ets - %% entries; it never inserts them - true = ets:insert_new(FlyingEts, {Key, Diff}) - end, - ok - end. +%% @todo Can we merge the flying behavior with the cur_cache refc stuff? +%% If a message is in cur_cache it will be refc so the first time +%% the {write,...} makes it through we can check cur_cache instead? +%% If value has 0 refc we don't write? +%client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts, +% client_ref = CRef }) -> +% Key = {MsgId, CRef}, +% %% @todo Use ets:update_counter with a default value. +% case ets:insert_new(FlyingEts, {Key, Diff}) of +% true -> ok; +% false -> try ets:update_counter(FlyingEts, Key, {2, Diff}) of +% 0 -> ok; +% Diff -> ok; +% Err when Err >= 2 -> +% %% The message must be referenced twice in the queue +% %% index. There is a bug somewhere, but we don't want +% %% to take down anything just because of this. Let's +% %% process the message as if the copies were in +% %% different queues (fan-out). +% ok; +% Err -> throw({bad_flying_ets_update, Diff, Err, Key}) +% catch error:badarg -> +% %% this is guaranteed to succeed since the +% %% server only removes and updates flying_ets +% %% entries; it never inserts them +% true = ets:insert_new(FlyingEts, {Key, Diff}) +% end, +% ok +% end. clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM, dying_clients = DyingClients }) -> @@ -810,10 +878,73 @@ handle_cast({client_delete, CRef}, State1 = State #msstate { clients = maps:remove(CRef, Clients) }, noreply(clear_client(CRef, State1)); -handle_cast({write, CRef, MsgId, Flow}, +%% @todo I think we don't need the flying table as long as we use the value from the cache instead. +%% We only need to write the message if it wasn't already written in the current file (keep keys?) +%% and the cache is > 0. We should NOT worry about messages in previous files (?) well we should +%% because the index can only have a single value so writing to another file doesn't work. +%% Can we just update_counter the index and if it doesn't exist then write? +%% +%% client: - write to ets cache OR increase refc if already exists (which one takes priority? write if not fan-out, refc if fan-out, but can't know yet so write) +%% - send {write...} +%% DON'T update_flying anymore! The refc is doing it for us +%% how does the refc work? we probably should +2 the refc on client_write because it will be decreased by both client ack and write? +%% +%% server: - -1 the cache; if result is 0 then we don't need to write (it was already acked) +%% HMMM but we can't figure things out like this if there's fan-out +%% so if fan-out we just increase/decrease the index (assuming non-zero) like we do before, +%% we will just not have the flying optimisation in that case? +%% +%% +%% remove: how do we know it was not written? we just read from the cache and if value is 0... Nope +%% we want to just -1 client-side and only tell the message store the message is gone if the value is 0 ideally or if the value isn't in the cache (of course) +%% +%% +%% when should values be removed from the cache in that case? when the file rolls over I guess? I guess that's why flying and refc are separate + + + + +%% we write to cache everytime we do a {write,...} +%% we want to avoid writing to disk if not necessary + +%% write: to cache with +1 refc and +1 pending_write +%% {write,...}: +0 refc -1 pending_write + %% if refc == 0 -> don't write + %% do we need to track pending_write at all? + %% what about confirms? maybe never try to do them early? +%% remove: -1 refc + + + + +%% write +%% refc+1 +%% ignore +%% confirm? + + + + +handle_cast({write, CRef, MsgRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, clients = Clients, credit_disc_bound = CreditDiscBound }) -> + + %% @todo Figure out how multi-write would work out with Flow. + %% With noflow, no problem. With flow, we must ack for + %% each message we end up writing? + %% + %% @todo Should we send a message per write? Probably, but + %% the message should say "there is something to write" + %% and we will look at the ets table? + + %% @todo How do we know which messages were handled and which weren't? update_flying? + %% Won't a multi-write prevent the flying (write/remove) optimisation? + + %% @todo We don't want to multi-write we want to multi-confirm, so the update_flying + %% we do here we instead want to do it on multiple messages if possible. So + %% get all messages from CRef that would end up ignored and do them all at once. + case Flow of flow -> {CPid, _} = maps:get(CRef, Clients), %% We are going to process a message sent by the @@ -823,7 +954,7 @@ handle_cast({write, CRef, MsgId, Flow}, noflow -> ok end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), - case update_flying(-1, MsgId, CRef, State) of + case flying_write({CRef, MsgRef}, State) of process -> [{MsgId, Msg, _PWC}] = ets:lookup(CurFileCacheEts, MsgId), noreply(write_message(MsgId, Msg, CRef, State)); @@ -841,6 +972,10 @@ handle_cast({write, CRef, MsgId, Flow}, %% current file then the cache entry will be removed by %% the normal logic for that in write_message/4 and %% maybe_roll_to_new_file/2. + %% @todo OK I think this is the core of the issue with + %% the cache. There's got to be a better way that + %% allows keeping the cache while not preventing + %% fast writes. case index_lookup(MsgId, State1) of [#msg_location { file = File }] when File == State1 #msstate.current_file -> @@ -854,8 +989,8 @@ handle_cast({write, CRef, MsgId, Flow}, handle_cast({remove, CRef, MsgIds}, State) -> {RemovedMsgIds, State1} = lists:foldl( - fun (MsgId, {Removed, State2}) -> - case update_flying(+1, MsgId, CRef, State2) of + fun ({MsgRef, MsgId}, {Removed, State2}) -> + case flying_remove({CRef, MsgRef}, State2) of process -> {[MsgId | Removed], remove_message(MsgId, CRef, State2)}; ignore -> {Removed, State2} @@ -1010,27 +1145,65 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, client_confirm(CRef, MsgIds, written, StateN) end, State1, CGs). -update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> - Key = {MsgId, CRef}, - NDiff = -Diff, +%% ets:match({{_, CRef}, 0}) +%% Delete all these objects. +%% Check if MsgId is in there. Hmm no we can't assume we process in that case... + %% OK we NEVER remove the entry before we look in the case of write. So for write we will always have MsgId in there if it has 0. + %% So if MsgId is in there we delete the objects and we ignore. + %% But we can't delete the objects randomly since the counter may get updated we have to delete the objects we found explicitly (and only if they still say 0). +%% OK but how do we avoid sending confirms multiple times then? The write message is coming anyway... + %% Hmmm.... + %% We need to get rid of gen_server2? Probably too hard right now. + %% We could keep track of which MsgIds were already processed and ignore the message in that case. + %% Or since we delete_object we could just update_flying like before? But if the message was + %% already processed we will ignore it and we don't want to send a confirm again... Tough. +%% (ignore) But we can do the ets:lookup and then if it's an ignore we match for the other ones? +%% (ignore) But we'll have MORE ets operations in that case because we still have incoming write/remove... + +flying_write(Key, #msstate { flying_ets = FlyingEts }) -> case ets:lookup(FlyingEts, Key) of - [] -> ignore; - [{_, Diff}] -> ignore; %% [1] - [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), - true = ets:delete_object(FlyingEts, {Key, 0}), - process; - [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), - ignore; - [{_, Err}] when Err >= 2 -> - %% The message must be referenced twice in the queue index. There - %% is a bug somewhere, but we don't want to take down anything - %% just because of this. Let's process the message as if the - %% copies were in different queues (fan-out). - ets:update_counter(FlyingEts, Key, {2, Diff}), - true = ets:delete_object(FlyingEts, {Key, 0}), + [{_, ?FLYING_WRITE}] -> + ets:update_counter(FlyingEts, Key, ?FLYING_WRITE_DONE), + %% We only remove the object if it hasn't changed + %% (a remove may be sent while we were processing the write). + true = ets:delete_object(FlyingEts, {Key, ?FLYING_IS_WRITTEN}), process; - [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key}) + [{_, ?FLYING_IS_IGNORED}] -> + ignore end. + +flying_remove(Key, #msstate { flying_ets = FlyingEts }) -> + Res = case ets:lookup(FlyingEts, Key) of + [{_, ?FLYING_IS_REMOVED}] -> + process; + [{_, ?FLYING_IS_IGNORED}] -> + ignore + end, + %% We are done with this message, we can unconditionally delete the entry. + true = ets:delete(FlyingEts, Key), + Res. + +%update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> +% Key = {MsgId, CRef}, +% NDiff = -Diff, +% case ets:lookup(FlyingEts, Key) of +% [] -> ignore; %% @todo How is that possible?! If we have a message we must have an entry... +% [{_, Diff}] -> ignore; %% [1] +% [{_, NDiff}] -> ets:update_counter(FlyingEts, Key, {2, Diff}), +% true = ets:delete_object(FlyingEts, {Key, 0}), +% process; +% [{_, 0}] -> true = ets:delete_object(FlyingEts, {Key, 0}), +% ignore; +% [{_, Err}] when Err >= 2 -> +% %% The message must be referenced twice in the queue index. There +% %% is a bug somewhere, but we don't want to take down anything +% %% just because of this. Let's process the message as if the +% %% copies were in different queues (fan-out). +% ets:update_counter(FlyingEts, Key, {2, Diff}), +% true = ets:delete_object(FlyingEts, {Key, 0}), +% process; +% [{_, Err}] -> throw({bad_flying_ets_record, Diff, Err, Key}) +% end. %% [1] We can get here, for example, in the following scenario: There %% is a write followed by a remove in flight. The counter will be 0, %% so on processing the write the server attempts to delete the @@ -1038,6 +1211,7 @@ update_flying(Diff, MsgId, CRef, #msstate { flying_ets = FlyingEts }) -> %% either insert a new entry, containing +1, or increment the existing %% entry to +1, thus preventing its removal. Either way therefore when %% the server processes the read, the counter will be +1. +%% @todo But why would the client insert the same MsgId twice? %% The general idea is that when a client of a transient message store is dying, %% we want to avoid writing messages that would end up being removed immediately @@ -1050,6 +1224,10 @@ write_action({false, not_found}, _MsgId, State) -> {write, State}; %% @todo This clause is probably fairly costly when large fan-out is used %% with both producers and consumers more or less catched up. +%% We probably should only increment the refcount for the current file? +%% We know it's not locked, it has a valid total size, etc. +%% We have to look up the index anyway so we know the file +%% and can decide without querying file summary. write_action({Mask, #msg_location { ref_count = 0, file = File, total_size = TotalSize }}, MsgId, State = #msstate { current_file = CurrentFile, @@ -1086,6 +1264,9 @@ write_message(MsgId, Msg, CRef, {write, State1} -> write_message(MsgId, Msg, record_pending_confirm(CRef, MsgId, State1)); + %% @todo Where does the confirm gets sent? + %% They aren't because those messages will not get written or confirmed + %% because the queue that sent the messages is shutting down. {ignore, CurFile, State1 = #msstate { current_file = CurFile }} -> State1; {ignore, _File, State1} -> @@ -1600,6 +1781,9 @@ maybe_roll_to_new_file( valid_total_size = 0, file_size = 0, locked = false }), + %% @todo We only delete those that have no reference??? + %% Does that mean the cache can grow unbounded? No because we decrease the reference on {write,...} + %% But we do get potential memory issues if the store is hammered. true = ets:match_delete(CurFileCacheEts, {'_', '_', 0}), State1 #msstate { current_file_handle = NextHdl, current_file = NextFile, diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 36c9c68e00..896303bb0a 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -651,7 +651,7 @@ ack([SeqId], State) -> false -> {[], IndexState} end, StoreState1 = case MsgLocation of - ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]), StoreState0; + ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [{SeqId, MsgId}]), StoreState0; ?IN_QUEUE_STORE -> rabbit_classic_queue_store_v2:remove(SeqId, StoreState0); ?IN_QUEUE_INDEX -> StoreState0; ?IN_MEMORY -> StoreState0 @@ -1348,11 +1348,11 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, VHost) -> rabbit_vhost_msg_store:client_init(VHost, MsgStore, Ref, MsgOnDiskFun). -msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> +msg_store_write(MSCState, IsPersistent, SeqId, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, fun (MSCState1) -> - rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + rabbit_msg_store:write_flow(SeqId, MsgId, Msg, MSCState1) end). msg_store_read(MSCState, IsPersistent, MsgId) -> @@ -1706,7 +1706,7 @@ remove(false, MsgStatus = #msg_status { %% Remove from msg_store and queue index, if necessary StoreState1 = case MsgLocation of - ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]), StoreState0; + ?IN_SHARED_STORE -> ok = msg_store_remove(MSCState, IsPersistent, [{SeqId, MsgId}]), StoreState0; ?IN_QUEUE_STORE -> rabbit_classic_queue_store_v2:remove(SeqId, StoreState0); ?IN_QUEUE_INDEX -> StoreState0; ?IN_MEMORY -> StoreState0 @@ -1898,7 +1898,7 @@ remove_queue_entries1( is_persistent = IsPersistent} = MsgStatus, {MsgIdsByStore, NextDeliverSeqId, Acks, State}) -> {case MsgLocation of - ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); + ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, {SeqId, MsgId}, MsgIdsByStore); _ -> MsgIdsByStore end, next_deliver_seq_id(SeqId, NextDeliverSeqId), @@ -2038,7 +2038,7 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { disk_write_count = Count}) when Force orelse IsPersistent -> case persist_to(MsgStatus) of - msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId, + msg_store -> ok = msg_store_write(MSCState, IsPersistent, SeqId, MsgId, prepare_to_store(Msg)), {MsgStatus#msg_status{msg_location = ?IN_SHARED_STORE}, State#vqstate{disk_write_count = Count + 1}}; @@ -2318,7 +2318,7 @@ accumulate_ack(#msg_status { seq_id = SeqId, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, SeqIdsInStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), case MsgLocation of - ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); + ?IN_SHARED_STORE -> rabbit_misc:maps_cons(IsPersistent, {SeqId, MsgId}, MsgIdsByStore); _ -> MsgIdsByStore end, case MsgLocation of @@ -2350,6 +2350,8 @@ sets_subtract(Set1, Set2) -> end. msgs_written_to_disk(Callback, MsgIdSet, ignored) -> + %% @todo Why does this behave like when msgs AND indices are written? indices may not be written yet here? + %% Right that's because the queue already acked it so it doesn't matter whether it's written to index. Callback(?MODULE, fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end); msgs_written_to_disk(Callback, MsgIdSet, written) -> diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index 31b8cff538..ec35fe012f 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -220,8 +220,12 @@ msg_store(Config) -> ?MODULE, msg_store1, [Config]). msg_store1(_Config) -> + %% We simulate the SeqId (used as a message ref for the flying optimisation) + %% using the process dictionary. + GenRefFun = fun(Key) -> V = case get(Key) of undefined -> 0; V0 -> V0 end, put(Key, V + 1), V end, + GenRef = fun() -> GenRefFun(msc) end, restart_msg_store_empty(), - MsgIds = [msg_id_bin(M) || M <- lists:seq(1,100)], + MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1,100)], {MsgIds1stHalf, MsgIds2ndHalf} = lists:split(length(MsgIds) div 2, MsgIds), Ref = rabbit_guid:gen(), {Cap, MSCState} = msg_store_client_init_capture( @@ -232,7 +236,7 @@ msg_store1(_Config) -> %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), %% test confirm logic - passed = test_msg_store_confirms([hd(MsgIds)], Cap, MSCState), + passed = test_msg_store_confirms([hd(MsgIds)], Cap, GenRef, MSCState), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, MsgIds, MSCState), %% publish the first half @@ -247,7 +251,7 @@ msg_store1(_Config) -> %% count code. We need to do this through a 2nd client since a %% single client is not supposed to write the same message more %% than once without first removing it. - ok = msg_store_write(MsgIds2ndHalf, MSC2State), + ok = msg_store_write([{GenRefFun(msc2), MsgId} || {_, MsgId} <- MsgIds2ndHalf], MSC2State), %% check they're still all in there true = msg_store_contains(true, MsgIds, MSCState), %% sync on the 2nd half @@ -275,16 +279,16 @@ msg_store1(_Config) -> ok = rabbit_variable_queue:stop_msg_store(?VHOST), ok = rabbit_variable_queue:start_msg_store(?VHOST, [], {fun ([]) -> finished; - ([MsgId|MsgIdsTail]) + ([{_, MsgId}|MsgIdsTail]) when length(MsgIdsTail) rem 2 == 0 -> {MsgId, 1, MsgIdsTail}; - ([MsgId|MsgIdsTail]) -> + ([{_, MsgId}|MsgIdsTail]) -> {MsgId, 0, MsgIdsTail} end, MsgIds2ndHalf}), MSCState5 = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), %% check we have the right msgs left lists:foldl( - fun (MsgId, Bool) -> + fun ({_, MsgId}, Bool) -> not(Bool = rabbit_msg_store:contains(MsgId, MSCState5)) end, false, MsgIds2ndHalf), ok = rabbit_msg_store:client_terminate(MSCState5), @@ -307,34 +311,41 @@ msg_store1(_Config) -> {ok, FileSize} = application:get_env(rabbit, msg_store_file_size_limit), PayloadSizeBits = 65536, BigCount = trunc(100 * FileSize / (PayloadSizeBits div 8)), - MsgIdsBig = [msg_id_bin(X) || X <- lists:seq(1, BigCount)], + MsgIdsBig = [{GenRef(), msg_id_bin(X)} || X <- lists:seq(1, BigCount)], Payload = << 0:PayloadSizeBits >>, ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, fun (MSCStateM) -> - [ok = rabbit_msg_store:write(MsgId, Payload, MSCStateM) || - MsgId <- MsgIdsBig], + [ok = rabbit_msg_store:write(SeqId, MsgId, Payload, MSCStateM) || + {SeqId, MsgId} <- MsgIdsBig], MSCStateM end), %% now read them to ensure we hit the fast client-side reading ok = foreach_with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, - fun (MsgId, MSCStateM) -> + fun ({_, MsgId}, MSCStateM) -> {{ok, Payload}, MSCStateN} = rabbit_msg_store:read( MsgId, MSCStateM), MSCStateN end, MsgIdsBig), - %% .., then 3s by 1... - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount, 1, -3)]), - %% .., then remove 3s by 2, from the young end first. This hits - %% GC (under 50% good data left, but no empty files. Must GC). - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount-1, 1, -3)]), - %% .., then remove 3s by 3, from the young end first. This hits - %% GC... - ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, - [msg_id_bin(X) || X <- lists:seq(BigCount-2, 1, -3)]), + %% We remove every other other message first, then do it again a second + %% time with another set of messages and then a third time. We start + %% with younger messages on purpose. So we split the list in three + %% lists keeping the message reference. + Part = fun + PartFun([], _, Acc) -> + Acc; + PartFun([E|Tail], N, Acc) -> + Pos = 1 + (N rem 3), + AccL = element(Pos, Acc), + PartFun(Tail, N + 1, setelement(Pos, Acc, [E|AccL])) + end, + {One, Two, Three} = Part(MsgIdsBig, 0, {[], [], []}), + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, One), + %% This is likely to hit GC (under 50% good data left in files, but no empty files). + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Two), + %% Files are empty now and will get removed. + ok = msg_store_remove(?PERSISTENT_MSG_STORE, Ref, Three), %% ensure empty ok = with_msg_store_client( ?PERSISTENT_MSG_STORE, Ref, @@ -343,7 +354,7 @@ msg_store1(_Config) -> MSCStateM end), %% - passed = test_msg_store_client_delete_and_terminate(), + passed = test_msg_store_client_delete_and_terminate(fun() -> GenRefFun(msc_cdat) end), %% restart empty restart_msg_store_empty(), passed. @@ -379,7 +390,8 @@ on_disk_capture(OnDisk, Awaiting, Pid) -> end end. -on_disk_await(Pid, MsgIds) when is_list(MsgIds) -> +on_disk_await(Pid, MsgIds0) when is_list(MsgIds0) -> + {_, MsgIds} = lists:unzip(MsgIds0), Pid ! {await, MsgIds, self()}, receive {Pid, arrived} -> ok; @@ -402,25 +414,25 @@ msg_store_client_init_capture(MsgStore, Ref) -> msg_store_contains(Atom, MsgIds, MSCState) -> Atom = lists:foldl( - fun (MsgId, Atom1) when Atom1 =:= Atom -> + fun ({_, MsgId}, Atom1) when Atom1 =:= Atom -> rabbit_msg_store:contains(MsgId, MSCState) end, Atom, MsgIds). msg_store_read(MsgIds, MSCState) -> - lists:foldl(fun (MsgId, MSCStateM) -> + lists:foldl(fun ({_, MsgId}, MSCStateM) -> {{ok, MsgId}, MSCStateN} = rabbit_msg_store:read( MsgId, MSCStateM), MSCStateN end, MSCState, MsgIds). msg_store_write(MsgIds, MSCState) -> - ok = lists:foldl(fun (MsgId, ok) -> - rabbit_msg_store:write(MsgId, MsgId, MSCState) + ok = lists:foldl(fun ({SeqId, MsgId}, ok) -> + rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState) end, ok, MsgIds). msg_store_write_flow(MsgIds, MSCState) -> - ok = lists:foldl(fun (MsgId, ok) -> - rabbit_msg_store:write_flow(MsgId, MsgId, MSCState) + ok = lists:foldl(fun ({SeqId, MsgId}, ok) -> + rabbit_msg_store:write_flow(SeqId, MsgId, MsgId, MSCState) end, ok, MsgIds). msg_store_remove(MsgIds, MSCState) -> @@ -442,36 +454,42 @@ foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> lists:foldl(fun (MsgId, MSCState) -> Fun(MsgId, MSCState) end, msg_store_client_init(MsgStore, Ref), L)). -test_msg_store_confirms(MsgIds, Cap, MSCState) -> +test_msg_store_confirms(MsgIds, Cap, GenRef, MSCState) -> %% write -> confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), + MsgIds1 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds1, MSCState), + ok = on_disk_await(Cap, MsgIds1), %% remove -> _ - ok = msg_store_remove(MsgIds, MSCState), + ok = msg_store_remove(MsgIds1, MSCState), ok = on_disk_await(Cap, []), %% write, remove -> confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), + MsgIds2 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds2, MSCState), + ok = msg_store_remove(MsgIds2, MSCState), + ok = on_disk_await(Cap, MsgIds2), %% write, remove, write -> confirmed, confirmed - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds ++ MsgIds), + MsgIds3 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds3, MSCState), + ok = msg_store_remove(MsgIds3, MSCState), + MsgIds4 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds4, MSCState), + ok = on_disk_await(Cap, MsgIds3 ++ MsgIds4), %% remove, write -> confirmed - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), + ok = msg_store_remove(MsgIds4, MSCState), + MsgIds5 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds5, MSCState), + ok = on_disk_await(Cap, MsgIds5), %% remove, write, remove -> confirmed - ok = msg_store_remove(MsgIds, MSCState), - ok = msg_store_write(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), - ok = on_disk_await(Cap, MsgIds), + ok = msg_store_remove(MsgIds5, MSCState), + MsgIds6 = [{GenRef(), MsgId} || {_, MsgId} <- MsgIds], + ok = msg_store_write(MsgIds6, MSCState), + ok = msg_store_remove(MsgIds6, MSCState), + ok = on_disk_await(Cap, MsgIds6), %% confirmation on timer-based sync - passed = test_msg_store_confirm_timer(), + passed = test_msg_store_confirm_timer(GenRef), passed. -test_msg_store_confirm_timer() -> +test_msg_store_confirm_timer(GenRef) -> Ref = rabbit_guid:gen(), MsgId = msg_id_bin(1), Self = self(), @@ -485,32 +503,34 @@ test_msg_store_confirm_timer() -> false -> ok end end), - ok = msg_store_write([MsgId], MSCState), - ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], MSCState, false), - ok = msg_store_remove([MsgId], MSCState), + MsgIdsChecked = [{GenRef(), MsgId}], + ok = msg_store_write(MsgIdsChecked, MSCState), + ok = msg_store_keep_busy_until_confirm([msg_id_bin(2)], GenRef, MSCState, false), + ok = msg_store_remove(MsgIdsChecked, MSCState), ok = rabbit_msg_store:client_delete_and_terminate(MSCState), passed. -msg_store_keep_busy_until_confirm(MsgIds, MSCState, Blocked) -> +msg_store_keep_busy_until_confirm(MsgIds, GenRef, MSCState, Blocked) -> After = case Blocked of false -> 0; true -> ?MAX_WAIT end, Recurse = fun () -> msg_store_keep_busy_until_confirm( - MsgIds, MSCState, credit_flow:blocked()) end, + MsgIds, GenRef, MSCState, credit_flow:blocked()) end, receive on_disk -> ok; {bump_credit, Msg} -> credit_flow:handle_bump_msg(Msg), Recurse() after After -> - ok = msg_store_write_flow(MsgIds, MSCState), - ok = msg_store_remove(MsgIds, MSCState), + MsgIds1 = [{GenRef(), MsgId} || MsgId <- MsgIds], + ok = msg_store_write_flow(MsgIds1, MSCState), + ok = msg_store_remove(MsgIds1, MSCState), Recurse() end. -test_msg_store_client_delete_and_terminate() -> +test_msg_store_client_delete_and_terminate(GenRef) -> restart_msg_store_empty(), - MsgIds = [msg_id_bin(M) || M <- lists:seq(1, 10)], + MsgIds = [{GenRef(), msg_id_bin(M)} || M <- lists:seq(1, 10)], Ref = rabbit_guid:gen(), MSCState = msg_store_client_init(?PERSISTENT_MSG_STORE, Ref), ok = msg_store_write(MsgIds, MSCState), @@ -1562,7 +1582,7 @@ queue_index_publish(SeqIds, Persistent, Qi) -> MsgId, SeqId, rabbit_msg_store, #message_properties{size = 10}, Persistent, infinity, QiN), - ok = rabbit_msg_store:write(MsgId, MsgId, MSCState), + ok = rabbit_msg_store:write(SeqId, MsgId, MsgId, MSCState), {QiM, [{SeqId, MsgId} | SeqIdsMsgIdsAcc]} end, {Qi, []}, SeqIds), %% do this just to force all of the publishes through to the msg_store: |