diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 9 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 21 |
3 files changed, 19 insertions, 22 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 8979062b9d..9521158a7d 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -330,7 +330,7 @@ record_current_channel_tx(ChPid, Txn) -> %% that wasn't happening already) store_ch_record((ch_record(ChPid))#cr{txn = Txn}). -deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, +deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc, State = #q{q = #amqqueue{name = QName}, active_consumers = ActiveConsumers, blocked_consumers = BlockedConsumers}) -> @@ -353,9 +353,9 @@ deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun }, FunAcc, {State2, ChAckTags1} = case AckRequired of true -> {State1, sets:add_element(AckTag, ChAckTags)}; - false -> - {confirm_message_internal(Message#basic_message.guid, - State1), ChAckTags} + false -> {confirm_message_internal( + Message#basic_message.guid, + State1), ChAckTags} end, NewC = C#cr{unsent_message_count = Count + 1, acktags = ChAckTags1}, @@ -410,6 +410,8 @@ confirm_message_internal(Guid, State = #q { guid_to_channel = GTC, msgs_on_disk = MOD, msg_indices_on_disk = MIOD }) -> case dict:find(Guid, GTC) of + {ok, {_, undefined}} -> + State; {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo), State #q { guid_to_channel = dict:erase(Guid, GTC), @@ -851,6 +853,7 @@ handle_cast({ack, Txn, AckTags, ChPid}, noreply(State#q{backing_queue_state = BQS1}) end; +%% Called when variable queue gets ack from a consumer. handle_cast({confirm_messages, Guids}, State) -> noreply(lists:foldl(fun (Guid, State0) -> confirm_message_internal(Guid, State0) diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 8fe029392d..445aff487d 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -36,7 +36,7 @@ -export([start_link/4, write/4, read/3, contains/2, remove/2, release/2, sync/3, client_init/2, client_terminate/1, client_delete_and_terminate/3, successfully_recovered_state/1, - register_callback/2]). + register_sync_callback/2]). -export([sync/1, gc_done/4, set_maximum_since_use/2, gc/3]). %% internal @@ -388,8 +388,8 @@ client_delete_and_terminate(CState, Server, Ref) -> successfully_recovered_state(Server) -> gen_server2:call(Server, successfully_recovered_state, infinity). -register_callback(Server, Fun) -> - gen_server2:call(Server, {register_callback, Fun}, infinity). +register_sync_callback(Server, Fun) -> + gen_server2:call(Server, {register_sync_callback, Fun}, infinity). %%---------------------------------------------------------------------------- %% Client-side-only helpers @@ -611,9 +611,8 @@ handle_call({delete_client, CRef}, _From, reply(ok, State #msstate { client_refs = sets:del_element(CRef, ClientRefs) }); -handle_call({register_callback, Fun}, {Pid, _}, +handle_call({register_sync_callback, Fun}, {Pid, _}, State = #msstate { pid_to_fun = PTF }) -> - rabbit_log:info("Registering callback ~p~n", [{Pid, Fun}]), erlang:monitor(process, Pid), reply(ok, State #msstate { pid_to_fun = dict:store(Pid, Fun, PTF) }). diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 454193c22f..7163834579 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -402,7 +402,14 @@ init(QueueName, IsDurable, Recover) -> true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); false -> undefined end, - register_confirm_callback(?PERSISTENT_MSG_STORE), + + Self = self(), + rabbit_msg_store:register_sync_callback( + ?PERSISTENT_MSG_STORE, + fun (Guids) -> + gen_server2:cast(Self, {msgs_written_to_disk, Guids}) + end), + TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -1073,18 +1080,6 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, msg_store_clients = MSCState1 }}. %%---------------------------------------------------------------------------- -%% Internal gubbins for confirms -%%---------------------------------------------------------------------------- - -register_confirm_callback(MessageStore) -> - Self = self(), - rabbit_msg_store:register_callback( - MessageStore, - fun (Guids) -> - gen_server2:cast(Self, {msgs_written_to_disk, Guids}) - end). - -%%---------------------------------------------------------------------------- %% Internal gubbins for acks %%---------------------------------------------------------------------------- |
