diff options
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 54 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 22 |
3 files changed, 53 insertions, 25 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index f3fce61a03..8bed40bef4 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -61,7 +61,9 @@ rate_timer_ref, expiry_timer_ref, stats_timer, - guid_to_channel + guid_to_channel, + msgs_on_disk, + msg_indices_on_disk }). -record(consumer, {tag, ack_required}). @@ -124,7 +126,9 @@ init(Q) -> rate_timer_ref = undefined, expiry_timer_ref = undefined, stats_timer = rabbit_event:init_stats_timer(), - guid_to_channel = dict:new()}, hibernate, + guid_to_channel = dict:new(), + msgs_on_disk = gb_sets:new(), + msg_indices_on_disk = gb_sets:new()}, hibernate, {backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}. terminate(shutdown, State = #q{backing_queue = BQ}) -> @@ -402,21 +406,26 @@ deliver_from_queue_deliver(AckRequired, false, {{Message, IsDelivered, AckTag}, 0 == Remaining, State #q { backing_queue_state = BQS1 }}. -confirm_message_internal(Guid, State = #q{guid_to_channel = GTC}) -> +confirm_message_internal(Guid, State = #q { guid_to_channel = GTC, + msgs_on_disk = MOD, + msg_indices_on_disk = MIOD }) -> case dict:find(Guid, GTC) of {ok, {ChPid, MsgSeqNo}} -> rabbit_channel:confirm(ChPid, MsgSeqNo), - State#q{guid_to_channel = dict:erase(Guid, GTC)}; + State #q { guid_to_channel = dict:erase(Guid, GTC), + msgs_on_disk = gb_sets:delete_any(Guid, MOD), + msg_indices_on_disk = gb_sets:delete_any(Guid, MIOD) }; _ -> State end. maybe_record_confirm_message(undefined, _, _, State) -> State; -maybe_record_confirm_message(MsgSeqNo, Message, ChPid, State) -> - State#q{guid_to_channel = - dict:store(Message#basic_message.guid, {ChPid, MsgSeqNo}, - State#q.guid_to_channel)}. +maybe_record_confirm_message(MsgSeqNo, + #basic_message { guid = Guid }, + ChPid, State) -> + State #q { guid_to_channel = + dict:store(Guid, {ChPid, MsgSeqNo}, State#q.guid_to_channel) }. run_message_queue(State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> Funs = {fun deliver_from_queue_pred/2, @@ -847,6 +856,35 @@ handle_cast({confirm_messages, Guids}, State) -> confirm_message_internal(Guid, State0) end, State, Guids)); +handle_cast({msgs_written_to_disk, Guids}, + State = #q{msgs_on_disk = MOD, + msg_indices_on_disk = MIOD}) -> + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MIOD), + gb_sets:fold(fun (Guid, State0) -> + confirm_message_internal(Guid, State0) + end, State, ToConfirmMsgs), + noreply(State#q{msgs_on_disk = + gb_sets:difference(gb_sets:union(MOD, GuidSet), + ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(MIOD, ToConfirmMsgs)}); + +handle_cast({msg_indices_written_to_disk, Guids}, + State = #q{msgs_on_disk = MOD, + msg_indices_on_disk = MIOD}) -> + rabbit_log:info("Message indices written to disk: ~p~n", [Guids]), + GuidSet = gb_sets:from_list(Guids), + ToConfirmMsgs = gb_sets:intersection(GuidSet, MOD), + gb_sets:fold(fun (Guid, State0) -> + confirm_message_internal(Guid, State0) + end, State, ToConfirmMsgs), + noreply(State#q{msgs_on_disk = + gb_sets:difference(MOD, ToConfirmMsgs), + msg_indices_on_disk = + gb_sets:difference(gb_sets:union(MIOD, GuidSet), + ToConfirmMsgs)}); + handle_cast({reject, AckTags, Requeue, ChPid}, State = #q{backing_queue = BQ, backing_queue_state = BQS}) -> case lookup_ch(ChPid) of diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 09eb7cfda2..9e917fe599 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -634,7 +634,6 @@ handle_cast({write, Pid, Guid, Msg}, %% New message, lots to do {ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl), {ok, TotalSize} = rabbit_msg_file:append(CurHdl, Guid, Msg), - rabbit_log:info("message ~p written to disk~n", [Guid]), ok = index_insert(#msg_location { guid = Guid, ref_count = 1, file = CurFile, offset = CurOffset, total_size = TotalSize }, @@ -826,7 +825,6 @@ internal_sync(State = #msstate { current_file_handle = CurHdl, on_sync = Syncs, pid_to_fun = PTF, pid_to_guids = PTG }) -> - rabbit_log:info("msg_store syncing~ncallbacks: ~p~nguids: ~p~n", [dict:to_list(PTF), dict:to_list(PTG)]), State1 = stop_sync_timer(State), State2 = case Syncs of [] -> State1; diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 2b0919a1f3..454193c22f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -369,6 +369,7 @@ stop_msg_store() -> ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE). init(QueueName, IsDurable, Recover) -> + Self = self(), {DeltaCount, Terms, IndexState} = rabbit_queue_index:init( QueueName, Recover, @@ -377,7 +378,7 @@ init(QueueName, IsDurable, Recover) -> rabbit_msg_store:contains(?PERSISTENT_MSG_STORE, Guid) end, fun (Guids) -> - rabbit_log:info("message indices ~p commited to disk~n", [Guids]) + gen_server2:cast(Self, {msg_indices_written_to_disk, Guids}) end), {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState), @@ -401,7 +402,7 @@ init(QueueName, IsDurable, Recover) -> true -> rabbit_msg_store:client_init(?PERSISTENT_MSG_STORE, PRef); false -> undefined end, - register_puback_callback(?PERSISTENT_MSG_STORE), + register_confirm_callback(?PERSISTENT_MSG_STORE), TransientClient = rabbit_msg_store:client_init(?TRANSIENT_MSG_STORE, TRef), State = #vqstate { q1 = queue:new(), @@ -1072,24 +1073,15 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, msg_store_clients = MSCState1 }}. %%---------------------------------------------------------------------------- -%% Internal gubbins for pubacks +%% Internal gubbins for confirms %%---------------------------------------------------------------------------- -register_puback_callback(MessageStore) -> +register_confirm_callback(MessageStore) -> + Self = self(), rabbit_msg_store:register_callback( MessageStore, fun (Guids) -> - spawn(fun () -> ok = rabbit_misc:with_exit_handler( - fun () -> - rabbit_log:info("something bad happened while sending pubacks back to channel"), - ok - end, - fun () -> - lists:foreach(fun(G) -> - rabbit_log:info("send puback back to channel for ~p~n", [G]) end, Guids), - ok - end) - end) + gen_server2:cast(Self, {msgs_written_to_disk, Guids}) end). %%---------------------------------------------------------------------------- |
