diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 24 |
1 files changed, 16 insertions, 8 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e90e1281b5..21a499c5e6 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -441,7 +441,8 @@ contains(MsgId, CState) -> server_call(CState, {contains, MsgId}). remove([], _CState) -> ok; remove(MsgIds, CState = #client_msstate { client_ref = CRef }) -> server_cast(CState, {remove, CRef, MsgIds}). -sync(MsgIds, K, CState) -> server_cast(CState, {sync, MsgIds, K}). +sync(MsgIds, K, CState = #client_msstate { client_ref = CRef }) -> + server_cast(CState, {sync, CRef, MsgIds, K}). set_maximum_since_use(Server, Age) -> gen_server2:cast(Server, {set_maximum_since_use, Age}). @@ -760,19 +761,26 @@ handle_cast({remove, CRef, MsgIds}, State) -> noreply(maybe_compact(client_confirm(CRef, gb_sets:from_list(MsgIds), removed, State1))); -handle_cast({sync, MsgIds, K}, +handle_cast({sync, CRef, MsgIds, K}, State = #msstate { current_file = CurFile, current_file_handle = CurHdl, - on_sync = Syncs }) -> - {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), - case lists:any(fun (MsgId) -> + on_sync = Syncs, + dying_clients = DyingClients }) -> + case sets:is_element(CRef, DyingClients) of + true -> + noreply(State); + false -> + {ok, SyncOffset} = file_handle_cache:last_sync_offset(CurHdl), + case lists:any( + fun (MsgId) -> #msg_location { file = File, offset = Offset } = index_lookup(MsgId, State), File =:= CurFile andalso Offset >= SyncOffset end, MsgIds) of - false -> K(), - noreply(State); - true -> noreply(State #msstate { on_sync = [K | Syncs] }) + false -> K(), + noreply(State); + true -> noreply(State #msstate { on_sync = [K | Syncs] }) + end end; handle_cast({combine_files, Source, Destination, Reclaimed}, |
