summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl11
-rw-r--r--src/rabbit_msg_store.erl9
-rw-r--r--src/rabbit_variable_queue.erl21
3 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 8979062b9d..9521158a7d 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -330,7 +330,7 @@ record_current_channel_tx(ChPid, Txn) ->
%% that wasn't happening already)
store_ch_record((ch_record(ChPid))#cr{txn = Txn}).
-deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
+deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
State = #q{q = #amqqueue{name = QName},
active_consumers = ActiveConsumers,
blocked_consumers = BlockedConsumers}) ->
@@ -353,9 +353,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc,
{State2, ChAckTags1} =
case AckRequired of
true -> {State1, sets:add_element(AckTag, ChAckTags)};
- false ->
- {confirm_message_internal(Message#basic_message.guid,
- State1), ChAckTags}
+ false -> {confirm_message_internal(
+ Message#basic_message.guid,
+ State1), ChAckTags}
end,
NewC = C#cr{unsent_message_count = Count + 1,
acktags = ChAckTags1},
@@ -410,6 +410,8 @@ confirm_message_internal(Guid, State = #q { guid_to_channel = GTC,
msgs_on_disk = MOD,
msg_indices_on_disk = MIOD }) ->
case dict:find(Guid, GTC) of
+ {ok, {_, undefined}} ->
+ State;
{ok, {ChPid, MsgSeqNo}} ->
rabbit_channel:confirm(ChPid, MsgSeqNo),
State #q { guid_to_channel = dict:erase(Guid, GTC),
@@ -851,6 +853,7 @@ handle_cast({ack, Txn, AckTags, ChPid},
noreply(State#q{backing_queue_state = BQS1})
end;
+%% Called when variable queue gets ack from a consumer.
handle_cast({confirm_messages, Guids}, State) ->
noreply(lists:foldl(fun (Guid, State0) ->
confirm_message_internal(Guid, State0)
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 8fe029392d..445aff487d 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -36,7 +36,7 @@
-export([start_link/4, write/4, read/3, contains/2, remove/2, release/2,
sync/3, client_init/2, client_terminate/1,
client_delete_and_terminate/3, successfully_recovered_state/1,
- register_callback/2]).
+ register_sync_callback/2]).
-export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal
@@ -388,8 +388,8 @@ client_delete_and_terminate(CState, Server, Ref) ->
successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).
-register_callback(Server, Fun) ->
- gen_server2:call(Server, {register_callback, Fun}, infinity).
+register_sync_callback(Server, Fun) ->
+ gen_server2:call(Server, {register_sync_callback, Fun}, infinity).
%%----------------------------------------------------------------------------
%% Client-side-only helpers
@@ -611,9 +611,8 @@ handle_call({delete_client, CRef}, _From,
reply(ok,
State #msstate { client_refs = sets:del_element(CRef, ClientRefs) });
-handle_call({register_callback, Fun}, {Pid, _},
+handle_call({register_sync_callback, Fun}, {Pid, _},
State = #msstate { pid_to_fun = PTF }) ->
- rabbit_log:info("Registering callback ~p~n", [{Pid, Fun}]),
erlang:monitor(process, Pid),
reply(ok,
State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }).
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 454193c22f..7163834579 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -402,7 +402,14 @@ init(QueueName, IsDurable, Recover) ->
true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef);
false -> undefined
end,
- register_confirm_callback(?PERSISTENT_MSG_STORE),
+
+ Self = self(),
+ rabbit_msg_store:register_sync_callback(
+ ?PERSISTENT_MSG_STORE,
+ fun (Guids) ->
+ gen_server2:cast(Self, {msgs_written_to_disk, Guids})
+ end),
+
TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef),
State = #vqstate {
q1 = queue:new(),
@@ -1073,18 +1080,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
msg_store_clients = MSCState1 }}.
%%----------------------------------------------------------------------------
-%% Internal gubbins for confirms
-%%----------------------------------------------------------------------------
-
-register_confirm_callback(MessageStore) ->
- Self = self(),
- rabbit_msg_store:register_callback(
- MessageStore,
- fun (Guids) ->
- gen_server2:cast(Self, {msgs_written_to_disk, Guids})
- end).
-
-%%----------------------------------------------------------------------------
%% Internal gubbins for acks
%%----------------------------------------------------------------------------