diff options
| author | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 00:37:29 +0000 |
|---|---|---|
| committer | Matthew Sackman <matthew@rabbitmq.com> | 2011-01-14 00:37:29 +0000 |
| commit | 5632d3944614e2134567386de8253e17a4a91ef6 (patch) | |
| tree | eb60b8d10d936636edd48693bd6233ab76e35391 /src | |
| parent | 2d0409667e82d86c55a18acfe2fb3fb9de56260e (diff) | |
| download | rabbitmq-server-git-5632d3944614e2134567386de8253e17a4a91ef6.tar.gz | |
More testing needed, but this does seem to work
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 126 | ||||
| -rw-r--r-- | src/rabbit_msg_store_gc.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_tests.erl | 20 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 31 |
4 files changed, 136 insertions, 63 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index f8b41ed388..e3f7cc8f53 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -34,8 +34,8 @@ -behaviour(gen_server2). -export([start_link/4, successfully_recovered_state/1, - client_init/3, client_terminate/1, client_delete_and_terminate/1, - client_ref/1, + client_init/4, client_terminate/1, client_delete_and_terminate/1, + client_ref/1, close_all_indicated/1, write/3, read/2, contains/2, remove/2, release/2, sync/3]). -export([sync/1, set_maximum_since_use/2, @@ -82,7 +82,8 @@ dedup_cache_ets, %% tid of dedup cache table cur_file_cache_ets, %% tid of current file cache table dying_clients, %% set of dying clients - client_refs, %% set of references of all registered clients + client_refs, %% map of references of all registered clients + %% mapping to close_fds_fun callbacks successfully_recovered, %% boolean: did we recover state? file_size_limit, %% how big are our files allowed to get? client_ondisk_callback, %% client ref to callback function mapping @@ -111,6 +112,7 @@ index_module, index_state, file_summary_ets, + file_handles_ets, msg_store }). @@ -124,6 +126,7 @@ index_module :: atom(), index_state :: any(), file_summary_ets :: ets:tid(), + file_handles_ets :: ets:tid(), msg_store :: server() }). @@ -146,13 +149,15 @@ {(fun ((A) -> 'finished' | {rabbit_guid:guid(), non_neg_integer(), A})), A}). -type(maybe_guid_fun() :: 'undefined' | fun ((gb_set()) -> any())). +-type(maybe_close_fds_fun() :: 'undefined' | fun (() -> 'ok')). +-type(deletion_thunk() :: fun (() -> 'ok' | deletion_thunk())). -spec(start_link/4 :: (atom(), file:filename(), [binary()] | 'undefined', startup_fun_state()) -> rabbit_types:ok_pid_or_error()). -spec(successfully_recovered_state/1 :: (server()) -> boolean()). --spec(client_init/3 :: (server(), client_ref(), maybe_guid_fun()) -> - client_msstate()). +-spec(client_init/4 :: (server(), client_ref(), maybe_guid_fun(), + maybe_close_fds_fun()) -> client_msstate()). -spec(client_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_delete_and_terminate/1 :: (client_msstate()) -> 'ok'). -spec(client_ref/1 :: (client_msstate()) -> client_ref()). @@ -169,8 +174,9 @@ -spec(set_maximum_since_use/2 :: (server(), non_neg_integer()) -> 'ok'). -spec(has_readers/2 :: (non_neg_integer(), gc_state()) -> boolean()). -spec(combine_files/3 :: (non_neg_integer(), non_neg_integer(), gc_state()) -> - 'ok'). --spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> 'ok'). + 'ok' | deletion_thunk()). +-spec(delete_file/2 :: (non_neg_integer(), gc_state()) -> + 'ok' | deletion_thunk()). -endif. @@ -399,11 +405,11 @@ start_link(Server, Dir, ClientRefs, StartupFunState) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -client_init(Server, Ref, MsgOnDiskFun) -> +client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) -> {IState, IModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts} = - gen_server2:call(Server, {new_client_state, Ref, MsgOnDiskFun}, - infinity), + gen_server2:call( + Server, {new_client_state, Ref, MsgOnDiskFun, CloseFDsFun}, infinity), #client_msstate { server = Server, client_ref = Ref, file_handle_cache = dict:new(), @@ -523,7 +529,8 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, CState = #client_msstate { file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, dedup_cache_ets = DedupCacheEts, - gc_pid = GCPid }) -> + gc_pid = GCPid, + client_ref = Ref }) -> Release = fun() -> ok = case ets:update_counter(FileSummaryEts, File, {#file_summary.readers, -1}) of @@ -570,7 +577,7 @@ client_read3(#msg_location { guid = Guid, file = File }, Defer, case index_lookup(Guid, CState) of #msg_location { file = File } = MsgLocation -> %% Still the same file. - mark_handle_open(FileHandlesEts, File), + mark_handle_open(FileHandlesEts, File, Ref), CState1 = close_all_indicated(CState), {Msg, CState2} = %% This will never be the current file @@ -684,6 +691,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> index_module = IndexModule, index_state = IndexState, file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, msg_store = self() }), @@ -694,10 +702,10 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) -> prioritise_call(Msg, _From, _State) -> case Msg of - successfully_recovered_state -> 7; - {new_client_state, _Ref, _MODC} -> 7; - {read, _Guid} -> 2; - _ -> 0 + successfully_recovered_state -> 7; + {new_client_state, _Ref, _MODC, _CloseFDsFun} -> 7; + {read, _Guid} -> 2; + _ -> 0 end. prioritise_cast(Msg, _State) -> @@ -713,7 +721,7 @@ prioritise_cast(Msg, _State) -> handle_call(successfully_recovered_state, _From, State) -> reply(State #msstate.successfully_recovered, State); -handle_call({new_client_state, CRef, Callback}, _From, +handle_call({new_client_state, CRef, MsgOnDiskFun, CloseFDsFun}, _From, State = #msstate { dir = Dir, index_state = IndexState, index_module = IndexModule, @@ -724,13 +732,14 @@ handle_call({new_client_state, CRef, Callback}, _From, client_refs = ClientRefs, client_ondisk_callback = CODC, gc_pid = GCPid }) -> - CODC1 = case Callback of + CODC1 = case MsgOnDiskFun of undefined -> CODC; - _ -> dict:store(CRef, Callback, CODC) + _ -> dict:store(CRef, MsgOnDiskFun, CODC) end, + ClientRefs1 = dict:store(CRef, CloseFDsFun, ClientRefs), reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts, DedupCacheEts, CurFileCacheEts}, - State #msstate { client_refs = sets:add_element(CRef, ClientRefs), + State #msstate { client_refs = ClientRefs1, client_ondisk_callback = CODC1 }); handle_call({client_terminate, CRef}, _From, @@ -755,7 +764,7 @@ handle_cast({client_delete, CRef}, dying_clients = DyingClients }) -> State1 = clear_client_callback( CRef, State #msstate { - client_refs = sets:del_element(CRef, ClientRefs), + client_refs = dict:erase(CRef, ClientRefs), dying_clients = sets:del_element(CRef, DyingClients) }), noreply(remove_message(CRef, CRef, State1)); @@ -832,10 +841,11 @@ handle_cast(sync, State) -> handle_cast({combine_files, Source, Destination, Reclaimed}, State = #msstate { sum_file_size = SumFileSize, file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + file_summary_ets = FileSummaryEts, + client_refs = ClientRefs }) -> ok = cleanup_after_file_deletion(Source, State), %% see comment in cleanup_after_file_deletion - true = mark_handle_to_close(FileHandlesEts, Destination), + true = mark_handle_to_close(ClientRefs, FileHandlesEts, Destination, false), true = ets:update_element(FileSummaryEts, Destination, {#file_summary.locked, false}), State1 = State #msstate { sum_file_size = SumFileSize - Reclaimed }, @@ -881,7 +891,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState, [ets:delete(T) || T <- [FileSummaryEts, DedupCacheEts, FileHandlesEts, CurFileCacheEts]], IndexModule:terminate(IndexState), - store_recovery_terms([{client_refs, sets:to_list(ClientRefs)}, + store_recovery_terms([{client_refs, dict:fetch_keys(ClientRefs)}, {index_module, IndexModule}], Dir), State3 #msstate { index_state = undefined, current_file_handle = undefined }. @@ -1221,29 +1231,48 @@ close_handle(Key, FHC) -> error -> FHC end. -mark_handle_open(FileHandlesEts, File) -> +mark_handle_open(FileHandlesEts, File, Ref) -> %% This is fine to fail (already exists) - ets:insert_new(FileHandlesEts, {{self(), File}, open}), + ets:insert_new(FileHandlesEts, {{Ref, File}, open}), true. -mark_handle_to_close(FileHandlesEts, File) -> - [ ets:update_element(FileHandlesEts, Key, {2, close}) - || {Key, open} <- ets:match_object(FileHandlesEts, {{'_', File}, open}) ], +mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) -> + [ begin + ets:update_element(FileHandlesEts, Key, {2, close}), + case Invoke of + true -> case dict:fetch(Ref, ClientRefs) of + undefined -> ok; + CloseFDsFun -> ok = CloseFDsFun() + end; + false -> ok + end + end || + {{Ref, _File} = Key, open} <- + ets:match_object(FileHandlesEts, {{'_', File}, open}) ], true. -close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts } = +safe_file_delete(FileHandlesEts, File, Dir) -> + case ets:match_object(FileHandlesEts, {{'_', File}, open}, 1) of + {[_|_], _Cont} -> + fun () -> safe_file_delete(FileHandlesEts, File, Dir) end; + _ -> + ok = file:delete(form_filename(Dir, filenum_to_name(File))) + end. + +close_all_indicated(#client_msstate { file_handles_ets = FileHandlesEts, + client_ref = Ref } = CState) -> - Objs = ets:match_object(FileHandlesEts, {{self(), '_'}, close}), - lists:foldl(fun ({Key = {_Self, File}, close}, CStateM) -> + Objs = ets:match_object(FileHandlesEts, {{Ref, '_'}, close}), + lists:foldl(fun ({Key = {_Ref, File}, close}, CStateM) -> true = ets:delete(FileHandlesEts, Key), close_handle(File, CStateM) end, CState, Objs). -close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, - file_handle_cache = FHC }) -> - Self = self(), +close_all_handles(CState = #client_msstate { file_handles_ets = FileHandlesEts, + file_handle_cache = FHC, + client_ref = Ref }) -> ok = dict:fold(fun (File, Hdl, ok) -> - true = ets:delete(FileHandlesEts, {Self, File}), + true = ets:delete(FileHandlesEts, {Ref, File}), file_handle_cache:close(Hdl) end, ok, FHC), CState #client_msstate { file_handle_cache = dict:new() }; @@ -1381,16 +1410,16 @@ index_delete_by_file(File, #msstate { index_module = Index, %%---------------------------------------------------------------------------- recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) -> - {false, IndexModule:new(Dir), sets:new()}; + {false, IndexModule:new(Dir), dict:new()}; recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) -> rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]), - {false, IndexModule:new(Dir), sets:new()}; + {false, IndexModule:new(Dir), dict:new()}; recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> Fresh = fun (ErrorMsg, ErrorArgs) -> rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n" "rebuilding indices from scratch~n", [Server | ErrorArgs]), - {false, IndexModule:new(Dir), sets:new()} + {false, IndexModule:new(Dir), dict:new()} end, case read_recovery_terms(Dir) of {false, Error} -> @@ -1403,7 +1432,8 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) -> true -> case IndexModule:recover(Dir) of {ok, IndexState1} -> {true, IndexState1, - sets:from_list(ClientRefs)}; + dict:from_list( + [{CRef, undefined} || CRef <- ClientRefs])}; {error, Error} -> Fresh("failed to recover index: ~p", [Error]) end; @@ -1744,7 +1774,8 @@ delete_file_if_empty(File, State = #msstate { cleanup_after_file_deletion(File, #msstate { file_handles_ets = FileHandlesEts, - file_summary_ets = FileSummaryEts }) -> + file_summary_ets = FileSummaryEts, + client_refs = ClientRefs }) -> %% Ensure that any clients that have open fhs to the file close %% them before using them again. This has to be done here (given %% it's done in the msg_store, and not the gc), and not when @@ -1752,7 +1783,7 @@ cleanup_after_file_deletion(File, %% the client could find the close, and close and reopen the fh, %% whilst the GC is waiting for readers to disappear, before it's %% actually done the GC. - true = mark_handle_to_close(FileHandlesEts, File), + true = mark_handle_to_close(ClientRefs, FileHandlesEts, File, true), [#file_summary { left = Left, right = Right, locked = true, @@ -1781,6 +1812,7 @@ has_readers(File, #gc_state { file_summary_ets = FileSummaryEts }) -> combine_files(Source, Destination, State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> [#file_summary { @@ -1841,7 +1873,7 @@ combine_files(Source, Destination, SourceHdl, DestinationHdl, Destination, State), %% tidy up ok = file_handle_cache:close(DestinationHdl), - ok = file_handle_cache:delete(SourceHdl), + ok = file_handle_cache:close(SourceHdl), %% don't update dest.right, because it could be changing at the %% same time @@ -1851,9 +1883,11 @@ combine_files(Source, Destination, {#file_summary.file_size, TotalValidData}]), Reclaimed = SourceFileSize + DestinationFileSize - TotalValidData, - gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}). + gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), + safe_file_delete(FileHandlesEts, Source, Dir). delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, + file_handles_ets = FileHandlesEts, dir = Dir, msg_store = Server }) -> [#file_summary { valid_total_size = 0, @@ -1861,8 +1895,8 @@ delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_size = FileSize, readers = 0 }] = ets:lookup(FileSummaryEts, File), {[], 0} = load_and_vacuum_message_file(File, State), - ok = file:delete(form_filename(Dir, filenum_to_name(File))), - gen_server2:cast(Server, {delete_file, File, FileSize}). + gen_server2:cast(Server, {delete_file, File, FileSize}), + safe_file_delete(FileHandlesEts, File, Dir). load_and_vacuum_message_file(File, #gc_state { dir = Dir, index_module = Index, diff --git a/src/rabbit_msg_store_gc.erl b/src/rabbit_msg_store_gc.erl index cd9fd4973f..0baf992b9d 100644 --- a/src/rabbit_msg_store_gc.erl +++ b/src/rabbit_msg_store_gc.erl @@ -42,6 +42,7 @@ -record(state, { pending_no_readers, + on_action, msg_store_state }). @@ -89,6 +90,7 @@ init([MsgStoreState]) -> ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use, [self()]), {ok, #state { pending_no_readers = dict:new(), + on_action = [], msg_store_state = MsgStoreState }, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. @@ -131,16 +133,30 @@ code_change(_OldVsn, State, _Extra) -> attempt_action(Action, Files, State = #state { pending_no_readers = Pending, + on_action = Thunks, msg_store_state = MsgStoreState }) -> + Thunks1 = run_thunks(Thunks), case [File || File <- Files, rabbit_msg_store:has_readers(File, MsgStoreState)] of - [] -> do_action(Action, Files, MsgStoreState), - State; + [] -> Thunks2 = case do_action(Action, Files, MsgStoreState) of + ok -> Thunks1; + Thunk -> [Thunk | Thunks1] + end, + State #state { on_action = Thunks2 }; [File | _] -> Pending1 = dict:store(File, {Action, Files}, Pending), - State #state { pending_no_readers = Pending1 } + State #state { pending_no_readers = Pending1, + on_action = Thunks1 } end. do_action(combine, [Source, Destination], MsgStoreState) -> rabbit_msg_store:combine_files(Source, Destination, MsgStoreState); do_action(delete, [File], MsgStoreState) -> rabbit_msg_store:delete_file(File, MsgStoreState). + +run_thunks(Thunks) -> + lists:foldl(fun (Thunk, Thunks1) -> + case Thunk() of + ok -> Thunks1; + Thunk1 -> [Thunk1 | Thunks1] + end + end, [], Thunks). diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl index d913092cce..befcfd99ad 100644 --- a/src/rabbit_tests.erl +++ b/src/rabbit_tests.erl @@ -1502,12 +1502,13 @@ msg_store_remove(MsgStore, Ref, Guids) -> with_msg_store_client(MsgStore, Ref, Fun) -> rabbit_msg_store:client_terminate( - Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined))). + Fun(rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined))). foreach_with_msg_store_client(MsgStore, Ref, Fun, L) -> rabbit_msg_store:client_terminate( - lists:foldl(fun (Guid, MSCState) -> Fun(Guid, MSCState) end, - rabbit_msg_store:client_init(MsgStore, Ref, undefined), L)). + lists:foldl( + fun (Guid, MSCState) -> Fun(Guid, MSCState) end, + rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined), L)). test_msg_store() -> restart_msg_store_empty(), @@ -1516,7 +1517,7 @@ test_msg_store() -> {Guids1stHalf, Guids2ndHalf} = lists:split(50, Guids), Ref = rabbit_guid:guid(), MSCState = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + undefined, undefined), %% check we don't contain any of the msgs we're about to publish false = msg_store_contains(false, Guids, MSCState), %% publish the first half @@ -1583,7 +1584,7 @@ test_msg_store() -> {Guid, 0, GuidsTail} end, Guids2ndHalf}), MSCState5 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + undefined, undefined), %% check we have the right msgs left lists:foldl( fun (Guid, Bool) -> @@ -1593,7 +1594,7 @@ test_msg_store() -> %% restart empty restart_msg_store_empty(), MSCState6 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + undefined, undefined), %% check we don't contain any of the msgs false = msg_store_contains(false, Guids, MSCState6), %% publish the first half again @@ -1602,7 +1603,7 @@ test_msg_store() -> ok = rabbit_msg_store:client_terminate( msg_store_read(Guids1stHalf, MSCState6)), MSCState7 = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, Ref, - undefined), + undefined, undefined), ok = rabbit_msg_store:remove(Guids1stHalf, MSCState7), ok = rabbit_msg_store:client_terminate(MSCState7), %% restart empty @@ -1661,7 +1662,7 @@ init_test_queue() -> Terms = rabbit_queue_index:shutdown_terms(TestQueue), PRef = proplists:get_value(persistent_ref, Terms, rabbit_guid:guid()), PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef, undefined), + PRef, undefined, undefined), Res = rabbit_queue_index:recover( TestQueue, Terms, false, fun (Guid) -> @@ -1695,7 +1696,8 @@ queue_index_publish(SeqIds, Persistent, Qi) -> true -> ?PERSISTENT_MSG_STORE; false -> ?TRANSIENT_MSG_STORE end, - MSCState = rabbit_msg_store:client_init(MsgStore, Ref, undefined), + MSCState = + rabbit_msg_store:client_init(MsgStore, Ref, undefined, undefined), {A, B = [{_SeqId, LastGuidWritten} | _]} = lists:foldl( fun (SeqId, {QiN, SeqIdsGuidsAcc}) -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 665cac96d9..8c048575a8 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -436,10 +436,12 @@ init(QueueName, true, true, MsgOnDiskFun, MsgIdxOnDiskFun) -> Terms}; _ -> {rabbit_guid:guid(), rabbit_guid:guid(), []} end, - PersistentClient = rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, - PRef, MsgOnDiskFun), - TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, - TRef, undefined), + PersistentClient = rabbit_msg_store:client_init( + ?PERSISTENT_MSG_STORE, PRef, MsgOnDiskFun, + msg_store_close_fds_fun(true)), + TransientClient = rabbit_msg_store:client_init( + ?TRANSIENT_MSG_STORE, TRef, undefined, + msg_store_close_fds_fun(false)), {DeltaCount, IndexState} = rabbit_queue_index:recover( QueueName, Terms1, @@ -933,7 +935,9 @@ with_immutable_msg_store_state(MSCState, IsPersistent, Fun) -> Res. msg_store_client_init(MsgStore, MsgOnDiskFun) -> - rabbit_msg_store:client_init(MsgStore, rabbit_guid:guid(), MsgOnDiskFun). + rabbit_msg_store:client_init( + MsgStore, rabbit_guid:guid(), MsgOnDiskFun, + msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE)). msg_store_write(MSCState, IsPersistent, Guid, Msg) -> with_immutable_msg_store_state( @@ -960,6 +964,23 @@ msg_store_sync(MSCState, IsPersistent, Guids, Callback) -> MSCState, IsPersistent, fun (MSCState1) -> rabbit_msg_store:sync(Guids, Callback, MSCState1) end). +msg_store_close_fds(MSCState, IsPersistent) -> + with_msg_store_state( + MSCState, IsPersistent, + fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end). + +msg_store_close_fds_fun(IsPersistent) -> + Self = self(), + fun () -> + rabbit_amqqueue:maybe_run_queue_via_backing_queue_async( + Self, + fun (State = #vqstate { msg_store_clients = MSCState }) -> + {[], State #vqstate { msg_store_clients = + msg_store_close_fds( + MSCState, IsPersistent) }} + end) + end. + maybe_write_delivered(false, _SeqId, IndexState) -> IndexState; maybe_write_delivered(true, SeqId, IndexState) -> |
