diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_msg_store.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 12 |
2 files changed, 28 insertions, 14 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e75f5655c6..28eb8213de 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -21,7 +21,7 @@ -export([start_link/4, successfully_recovered_state/1, client_init/4, client_terminate/1, client_delete_and_terminate/1, client_ref/1, close_all_indicated/1, - write/3, read/2, contains/2, remove/2]). + write/3, write_flow/3, read/2, contains/2, remove/2]). -export([set_maximum_since_use/2, has_readers/2, combine_files/3, delete_file/2]). %% internal @@ -152,6 +152,7 @@ -spec(close_all_indicated/1 :: (client_msstate()) -> rabbit_types:ok(client_msstate())). -spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). +-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok'). -spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) -> {rabbit_types:ok(msg()) | 'not_found', client_msstate()}). -spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()). @@ -461,14 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) -> client_ref(#client_msstate { client_ref = Ref }) -> Ref. -write(MsgId, Msg, - CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, - client_ref = CRef, - server = Server }) -> - ok = client_update_flying(+1, MsgId, CState), - ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), +write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) -> credit_flow:send(whereis(Server)), - ok = server_cast(CState, {write, CRef, MsgId}). + client_write(MsgId, Msg, flow, CState). + +write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState). read(MsgId, CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) -> @@ -503,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) -> server_cast(#client_msstate { server = Server }, Msg) -> gen_server2:cast(Server, Msg). +client_write(MsgId, Msg, Flow, + CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts, + client_ref = CRef }) -> + ok = client_update_flying(+1, MsgId, CState), + ok = update_msg_cache(CurFileCacheEts, MsgId, Msg), + ok = server_cast(CState, {write, CRef, MsgId, Flow}). + client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer, CState = #client_msstate { file_summary_ets = FileSummaryEts }) -> case ets:lookup(FileSummaryEts, File) of @@ -798,11 +803,14 @@ handle_cast({client_delete, CRef}, State1 = State #msstate { clients = dict:erase(CRef, Clients) }, noreply(remove_message(CRef, CRef, clear_client(CRef, State1))); -handle_cast({write, CRef, MsgId}, +handle_cast({write, CRef, MsgId, Flow}, State = #msstate { cur_file_cache_ets = CurFileCacheEts, clients = Clients }) -> - {CPid, _, _} = dict:fetch(CRef, Clients), - credit_flow:ack(CPid), + case Flow of + flow -> {CPid, _, _} = dict:fetch(CRef, Clients), + credit_flow:ack(CPid); + noflow -> ok + end, true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}), case update_flying(-1, MsgId, CRef, State) of process -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 63a0927f77..9b45b55852 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -870,17 +870,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) -> msg_store_write(MSCState, IsPersistent, MsgId, Msg) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:write_flow(MsgId, Msg, MSCState1) + end). msg_store_read(MSCState, IsPersistent, MsgId) -> with_msg_store_state( MSCState, IsPersistent, - fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end). + fun (MSCState1) -> + rabbit_msg_store:read(MsgId, MSCState1) + end). msg_store_remove(MSCState, IsPersistent, MsgIds) -> with_immutable_msg_store_state( MSCState, IsPersistent, - fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end). + fun (MCSState1) -> + rabbit_msg_store:remove(MsgIds, MCSState1) + end). msg_store_close_fds(MSCState, IsPersistent) -> with_msg_store_state( |
