summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/rabbit_msg_store.erl30
-rw-r--r--src/rabbit_variable_queue.erl12
2 files changed, 28 insertions, 14 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index e75f5655c6..28eb8213de 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -21,7 +21,7 @@
-export([start_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
- write/3, read/2, contains/2, remove/2]).
+ write/3, write_flow/3, read/2, contains/2, remove/2]).
-export([set_maximum_since_use/2, has_readers/2, combine_files/3,
delete_file/2]). %% internal
@@ -152,6 +152,7 @@
-spec(close_all_indicated/1 ::
(client_msstate()) -> rabbit_types:ok(client_msstate())).
-spec(write/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
+-spec(write_flow/3 :: (rabbit_types:msg_id(), msg(), client_msstate()) -> 'ok').
-spec(read/2 :: (rabbit_types:msg_id(), client_msstate()) ->
{rabbit_types:ok(msg()) | 'not_found', client_msstate()}).
-spec(contains/2 :: (rabbit_types:msg_id(), client_msstate()) -> boolean()).
@@ -461,14 +462,11 @@ client_delete_and_terminate(CState = #client_msstate { client_ref = Ref }) ->
client_ref(#client_msstate { client_ref = Ref }) -> Ref.
-write(MsgId, Msg,
- CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
- client_ref = CRef,
- server = Server }) ->
- ok = client_update_flying(+1, MsgId, CState),
- ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+write_flow(MsgId, Msg, CState = #client_msstate { server = Server }) ->
credit_flow:send(whereis(Server)),
- ok = server_cast(CState, {write, CRef, MsgId}).
+ client_write(MsgId, Msg, flow, CState).
+
+write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
read(MsgId,
CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts }) ->
@@ -503,6 +501,13 @@ server_call(#client_msstate { server = Server }, Msg) ->
server_cast(#client_msstate { server = Server }, Msg) ->
gen_server2:cast(Server, Msg).
+client_write(MsgId, Msg, Flow,
+ CState = #client_msstate { cur_file_cache_ets = CurFileCacheEts,
+ client_ref = CRef }) ->
+ ok = client_update_flying(+1, MsgId, CState),
+ ok = update_msg_cache(CurFileCacheEts, MsgId, Msg),
+ ok = server_cast(CState, {write, CRef, MsgId, Flow}).
+
client_read1(#msg_location { msg_id = MsgId, file = File } = MsgLocation, Defer,
CState = #client_msstate { file_summary_ets = FileSummaryEts }) ->
case ets:lookup(FileSummaryEts, File) of
@@ -798,11 +803,14 @@ handle_cast({client_delete, CRef},
State1 = State #msstate { clients = dict:erase(CRef, Clients) },
noreply(remove_message(CRef, CRef, clear_client(CRef, State1)));
-handle_cast({write, CRef, MsgId},
+handle_cast({write, CRef, MsgId, Flow},
State = #msstate { cur_file_cache_ets = CurFileCacheEts,
clients = Clients }) ->
- {CPid, _, _} = dict:fetch(CRef, Clients),
- credit_flow:ack(CPid),
+ case Flow of
+ flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ credit_flow:ack(CPid);
+ noflow -> ok
+ end,
true = 0 =< ets:update_counter(CurFileCacheEts, MsgId, {3, -1}),
case update_flying(-1, MsgId, CRef, State) of
process ->
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 63a0927f77..9b45b55852 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -870,17 +870,23 @@ msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:write(MsgId, Msg, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
msg_store_read(MSCState, IsPersistent, MsgId) ->
with_msg_store_state(
MSCState, IsPersistent,
- fun (MSCState1) -> rabbit_msg_store:read(MsgId, MSCState1) end).
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
msg_store_remove(MSCState, IsPersistent, MsgIds) ->
with_immutable_msg_store_state(
MSCState, IsPersistent,
- fun (MCSState1) -> rabbit_msg_store:remove(MsgIds, MCSState1) end).
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
msg_store_close_fds(MSCState, IsPersistent) ->
with_msg_store_state(