diff options
| -rw-r--r-- | ebin/rabbit_app.in | 2 | ||||
| -rw-r--r-- | src/rabbit_queue_index.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 213 |
3 files changed, 176 insertions, 74 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in index 4bfdd192f5..d9da66c812 100644 --- a/ebin/rabbit_app.in +++ b/ebin/rabbit_app.in @@ -26,7 +26,7 @@ %% breaks the QPid Java client {frame_max, 131072}, {channel_max, 0}, - {heartbeat, 580}, + {heartbeat, 60}, {msg_store_file_size_limit, 16777216}, {fhc_write_buffering, true}, {fhc_read_buffering, true}, diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index 7c8fdf38c0..b33425c5a3 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -16,7 +16,7 @@ -module(rabbit_queue_index). --export([erase/1, init/3, recover/6, +-export([erase/1, init/3, reset_state/1, recover/6, terminate/2, delete_and_terminate/1, pre_publish/7, flush_pre_publish_cache/2, publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1, @@ -224,6 +224,7 @@ -type(shutdown_terms() :: [term()] | 'non_clean_shutdown'). -spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok'). +-spec(reset_state/1 :: (qistate()) -> qistate()). -spec(init/3 :: (rabbit_amqqueue:name(), on_sync_fun(), on_sync_fun()) -> qistate()). -spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(), @@ -261,10 +262,19 @@ erase(Name) -> #qistate { dir = Dir } = blank_state(Name), - case rabbit_file:is_dir(Dir) of - true -> rabbit_file:recursive_delete([Dir]); - false -> ok - end. + erase_index_dir(Dir). + +%% used during variable queue purge when there are no pending acks +reset_state(#qistate{ dir = Dir, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun, + journal_handle = JournalHdl }) -> + ok = erase_index_dir(Dir), + ok = case JournalHdl of + undefined -> ok; + _ -> file_handle_cache:close(JournalHdl) + end, + blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun). init(Name, OnSyncFun, OnSyncMsgFun) -> State = #qistate { dir = Dir } = blank_state(Name), @@ -507,11 +517,22 @@ all_queue_directory_names(Dir) -> %% startup and shutdown %%---------------------------------------------------------------------------- +erase_index_dir(Dir) -> + case rabbit_file:is_dir(Dir) of + true -> rabbit_file:recursive_delete([Dir]); + false -> ok + end. + blank_state(QueueName) -> blank_state_dir( filename:join(queues_dir(), queue_name_to_dir_name(QueueName))). blank_state_dir(Dir) -> + blank_state_dir_funs(Dir, + fun (_) -> ok end, + fun (_) -> ok end). + +blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) -> {ok, MaxJournal} = application:get_env(rabbit, queue_index_max_journal_entries), #qistate { dir = Dir, @@ -519,8 +540,8 @@ blank_state_dir(Dir) -> journal_handle = undefined, dirty_count = 0, max_journal_entries = MaxJournal, - on_sync = fun (_) -> ok end, - on_sync_msg = fun (_) -> ok end, + on_sync = OnSyncFun, + on_sync_msg = OnSyncMsgFun, unconfirmed = gb_sets:new(), unconfirmed_msg = gb_sets:new(), pre_publish_cache = [], diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 41a13ac9b6..15eac57e68 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -532,38 +532,29 @@ terminate(_Reason, State) -> %% the only difference between purge and delete is that delete also %% needs to delete everything that's been delivered and not ack'd. delete_and_terminate(_Reason, State) -> - %% TODO: there is no need to interact with qi at all - which we do - %% as part of 'purge' and 'purge_pending_ack', other than - %% deleting it. - {_PurgeCount, State1} = purge(State), - State2 = #vqstate { index_state = IndexState, - msg_store_clients = {MSCStateP, MSCStateT} } = - purge_pending_ack(false, State1), - IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), + %% Normally when we purge messages we interact with the qi by + %% issues delivers and acks for every purged message. In this case + %% we don't need to do that, so we just delete the qi. + State1 = purge_and_index_reset(State), + State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } = + purge_pending_ack_delete_and_terminate(State1), case MSCStateP of undefined -> ok; _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP) end, rabbit_msg_store:client_delete_and_terminate(MSCStateT), - a(State2 #vqstate { index_state = IndexState1, - msg_store_clients = undefined }). + a(State2 #vqstate { msg_store_clients = undefined }). delete_crashed(#amqqueue{name = QName}) -> ok = rabbit_queue_index:erase(QName). -purge(State = #vqstate { q4 = Q4, - len = Len }) -> - %% TODO: when there are no pending acks, which is a common case, - %% we could simply wipe the qi instead of issuing delivers and - %% acks for all the messages. - State1 = remove_queue_entries(Q4, State), - - State2 = #vqstate { q1 = Q1 } = - purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }), - - State3 = remove_queue_entries(Q1, State2), - - {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}. +purge(State = #vqstate { len = Len }) -> + case is_pending_ack_empty(State) of + true -> + {Len, purge_and_index_reset(State)}; + false -> + {Len, purge_when_pending_acks(State)} + end. purge_acks(State) -> a(purge_pending_ack(false, State)). @@ -753,10 +744,8 @@ len(#vqstate { len = Len }) -> Len. is_empty(State) -> 0 == len(State). -depth(State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA }) -> - len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). +depth(State) -> + len(State) + count_pending_acks(State). set_ram_duration_target( DurationTarget, State = #vqstate { @@ -1133,7 +1122,7 @@ maybe_write_delivered(false, _SeqId, IndexState) -> maybe_write_delivered(true, SeqId, IndexState) -> rabbit_queue_index:deliver([SeqId], IndexState). -betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> +betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) -> {Filtered, Delivers, Acks, RamReadyCount, RamBytes} = lists:foldr( fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M, @@ -1156,9 +1145,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) -> end end end, {?QUEUE:new(), [], [], 0, 0}, List), - {Filtered, RamReadyCount, RamBytes, - rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState))}. + {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}. %% [0] We don't increase RamBytes here, even though it pertains to %% unacked messages too, since if HaveMsg then the message must have %% been stored in the QI, thus the message must have been in @@ -1384,25 +1371,75 @@ remove(AckRequired, MsgStatus = #msg_status { State2 #vqstate {out_counter = OutCount + 1, index_state = IndexState2})}. -purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) -> +%%---------------------------------------------------------------------------- +%% Helpers for Public API purge/1 function +%%---------------------------------------------------------------------------- + +%% The difference between purge_when_pending_acks/1 +%% vs. purge_and_index_reset/1 is that the first one issues a deliver +%% and an ack to the queue index for every message that's being +%% removed, while the later just resets the queue index state. +purge_when_pending_acks(State) -> + State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State), + a(State1). + +purge_and_index_reset(State) -> + State1 = purge1(process_delivers_and_acks_fun(none), State), + a(reset_qi_state(State1)). + +%% This function removes messages from each of {q1, q2, q3, q4}. +%% +%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3 +%% are specially handled by purge_betas_and_deltas/2. +%% +%% purge_betas_and_deltas/2 loads messages from the queue index, +%% filling up q3 and in some cases moving messages form q2 to q3 while +%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The +%% messages loaded into q3 are removed by calling +%% remove_queue_entries/3 until there are no more messages to be read +%% from the queue index. Messages are read in batches from the queue +%% index. +purge1(AfterFun, State = #vqstate { q4 = Q4}) -> + State1 = remove_queue_entries(Q4, AfterFun, State), + + State2 = #vqstate {q1 = Q1} = + purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}), + + State3 = remove_queue_entries(Q1, AfterFun, State2), + + a(State3#vqstate{q1 = ?QUEUE:new()}). + +reset_qi_state(State = #vqstate{index_state = IndexState}) -> + State#vqstate{index_state = + rabbit_queue_index:reset_state(IndexState)}. + +is_pending_ack_empty(State) -> + count_pending_acks(State) =:= 0. + +count_pending_acks(#vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA). + +purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) -> case ?QUEUE:is_empty(Q3) of true -> State; - false -> State1 = remove_queue_entries(Q3, State), - purge_betas_and_deltas(maybe_deltas_to_betas( + false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State), + purge_betas_and_deltas(DelsAndAcksFun, + maybe_deltas_to_betas( + DelsAndAcksFun, State1#vqstate{q3 = ?QUEUE:new()})) end. -remove_queue_entries(Q, State = #vqstate{index_state = IndexState, - msg_store_clients = MSCState}) -> +remove_queue_entries(Q, DelsAndAcksFun, + State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, {orddict:new(), [], [], State}, Q), ok = orddict:fold(fun (IsPersistent, MsgIds, ok) -> msg_store_remove(MSCState, IsPersistent, MsgIds) end, ok, MsgIdsByStore), - IndexState1 = rabbit_queue_index:ack( - Acks, rabbit_queue_index:deliver(Delivers, IndexState)), - State1#vqstate{index_state = IndexState1}. + DelsAndAcksFun(Delivers, Acks, State1). remove_queue_entries1( #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered, @@ -1417,6 +1454,18 @@ remove_queue_entries1( cons_if(IndexOnDisk, SeqId, Acks), stats({-1, 0}, {MsgStatus, none}, State)}. +process_delivers_and_acks_fun(deliver_and_ack) -> + fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) -> + IndexState1 = + rabbit_queue_index:ack( + Acks, rabbit_queue_index:deliver(Delivers, IndexState)), + State #vqstate { index_state = IndexState1 } + end; +process_delivers_and_acks_fun(_) -> + fun (_, _, State) -> + State + end. + %%---------------------------------------------------------------------------- %% Internal gubbins for publishing %%---------------------------------------------------------------------------- @@ -1611,11 +1660,29 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, end. purge_pending_ack(KeepPersistent, - State = #vqstate { ram_pending_ack = RPA, - disk_pending_ack = DPA, - qi_pending_ack = QPA, - index_state = IndexState, + State = #vqstate { index_state = IndexState, msg_store_clients = MSCState }) -> + {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State), + case KeepPersistent of + true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState), + State1; + false -> IndexState1 = + rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), + remove_msgs_by_id(MsgIdsByStore, MSCState), + State1 #vqstate { index_state = IndexState1 } + end. + +purge_pending_ack_delete_and_terminate( + State = #vqstate { index_state = IndexState, + msg_store_clients = MSCState }) -> + {_, MsgIdsByStore, State1} = purge_pending_ack1(State), + IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState), + remove_msgs_by_id(MsgIdsByStore, MSCState), + State1 #vqstate { index_state = IndexState1 }. + +purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, + disk_pending_ack = DPA, + qi_pending_ack = QPA }) -> F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end, {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} = rabbit_misc:gb_trees_fold( @@ -1625,19 +1692,26 @@ purge_pending_ack(KeepPersistent, State1 = State #vqstate { ram_pending_ack = gb_trees:empty(), disk_pending_ack = gb_trees:empty(), qi_pending_ack = gb_trees:empty()}, + {IndexOnDiskSeqIds, MsgIdsByStore, State1}. - case KeepPersistent of - true -> case orddict:find(false, MsgIdsByStore) of - error -> State1; - {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, - MsgIds), - State1 - end; - false -> IndexState1 = - rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), - [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)], - State1 #vqstate { index_state = IndexState1 } +%% MsgIdsByStore is an orddict with two keys: +%% +%% true: holds a list of Persistent Message Ids. +%% false: holds a list of Transient Message Ids. +%% +%% When we call orddict:to_list/1 we get two sets of msg ids, where +%% IsPersistent is either true for persistent messages or false for +%% transient ones. The msg_store_remove/3 function takes this boolean +%% flag to determine from which store the messages should be removed +%% from. +remove_msgs_by_id(MsgIdsByStore, MSCState) -> + [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) + || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. + +remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> + case orddict:find(false, MsgIdsByStore) of + error -> ok; + {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) end. accumulate_ack_init() -> {[], orddict:new(), []}. @@ -1971,9 +2045,15 @@ fetch_from_q3(State = #vqstate { q1 = Q1, {loaded, {MsgStatus, State2}} end. -maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) -> +maybe_deltas_to_betas(State) -> + AfterFun = process_delivers_and_acks_fun(deliver_and_ack), + maybe_deltas_to_betas(AfterFun, State). + +maybe_deltas_to_betas(_DelsAndAcksFun, + State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) -> State; -maybe_deltas_to_betas(State = #vqstate { +maybe_deltas_to_betas(DelsAndAcksFun, + State = #vqstate { q2 = Q2, delta = Delta, q3 = Q3, @@ -1993,19 +2073,20 @@ maybe_deltas_to_betas(State = #vqstate { DeltaSeqIdEnd]), {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState), - {Q3a, RamCountsInc, RamBytesInc, IndexState2} = + {Q3a, RamCountsInc, RamBytesInc, State1} = betas_from_index_entries(List, TransientThreshold, - RPA, DPA, QPA, IndexState1), - State1 = State #vqstate { index_state = IndexState2, - ram_msg_count = RamMsgCount + RamCountsInc, - ram_bytes = RamBytes + RamBytesInc, - disk_read_count = DiskReadCount + RamCountsInc}, + RPA, DPA, QPA, DelsAndAcksFun, + State #vqstate { index_state = IndexState1 }), + State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc, + ram_bytes = RamBytes + RamBytesInc, + disk_read_count = DiskReadCount + RamCountsInc }, case ?QUEUE:len(Q3a) of 0 -> %% we ignored every message in the segment due to it being %% transient and below the threshold maybe_deltas_to_betas( - State1 #vqstate { + DelsAndAcksFun, + State2 #vqstate { delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })}); Q3aLen -> Q3b = ?QUEUE:join(Q3, Q3a), @@ -2013,14 +2094,14 @@ maybe_deltas_to_betas(State = #vqstate { 0 -> %% delta is now empty, but it wasn't before, so %% can now join q2 onto q3 - State1 #vqstate { q2 = ?QUEUE:new(), + State2 #vqstate { q2 = ?QUEUE:new(), delta = ?BLANK_DELTA, q3 = ?QUEUE:join(Q3b, Q2) }; N when N > 0 -> Delta1 = d(#delta { start_seq_id = DeltaSeqId1, count = N, end_seq_id = DeltaSeqIdEnd }), - State1 #vqstate { delta = Delta1, + State2 #vqstate { delta = Delta1, q3 = Q3b } end end. |
