summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ebin/rabbit_app.in2
-rw-r--r--src/rabbit_queue_index.erl35
-rw-r--r--src/rabbit_variable_queue.erl213
3 files changed, 176 insertions, 74 deletions
diff --git a/ebin/rabbit_app.in b/ebin/rabbit_app.in
index 4bfdd192f5..d9da66c812 100644
--- a/ebin/rabbit_app.in
+++ b/ebin/rabbit_app.in
@@ -26,7 +26,7 @@
%% breaks the QPid Java client
{frame_max, 131072},
{channel_max, 0},
- {heartbeat, 580},
+ {heartbeat, 60},
{msg_store_file_size_limit, 16777216},
{fhc_write_buffering, true},
{fhc_read_buffering, true},
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 7c8fdf38c0..b33425c5a3 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -16,7 +16,7 @@
-module(rabbit_queue_index).
--export([erase/1, init/3, recover/6,
+-export([erase/1, init/3, reset_state/1, recover/6,
terminate/2, delete_and_terminate/1,
pre_publish/7, flush_pre_publish_cache/2,
publish/6, deliver/2, ack/2, sync/1, needs_sync/1, flush/1,
@@ -224,6 +224,7 @@
-type(shutdown_terms() :: [term()] | 'non_clean_shutdown').
-spec(erase/1 :: (rabbit_amqqueue:name()) -> 'ok').
+-spec(reset_state/1 :: (qistate()) -> qistate()).
-spec(init/3 :: (rabbit_amqqueue:name(),
on_sync_fun(), on_sync_fun()) -> qistate()).
-spec(recover/6 :: (rabbit_amqqueue:name(), shutdown_terms(), boolean(),
@@ -261,10 +262,19 @@
erase(Name) ->
#qistate { dir = Dir } = blank_state(Name),
- case rabbit_file:is_dir(Dir) of
- true -> rabbit_file:recursive_delete([Dir]);
- false -> ok
- end.
+ erase_index_dir(Dir).
+
+%% used during variable queue purge when there are no pending acks
+reset_state(#qistate{ dir = Dir,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
+ journal_handle = JournalHdl }) ->
+ ok = erase_index_dir(Dir),
+ ok = case JournalHdl of
+ undefined -> ok;
+ _ -> file_handle_cache:close(JournalHdl)
+ end,
+ blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun).
init(Name, OnSyncFun, OnSyncMsgFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
@@ -507,11 +517,22 @@ all_queue_directory_names(Dir) ->
%% startup and shutdown
%%----------------------------------------------------------------------------
+erase_index_dir(Dir) ->
+ case rabbit_file:is_dir(Dir) of
+ true -> rabbit_file:recursive_delete([Dir]);
+ false -> ok
+ end.
+
blank_state(QueueName) ->
blank_state_dir(
filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
blank_state_dir(Dir) ->
+ blank_state_dir_funs(Dir,
+ fun (_) -> ok end,
+ fun (_) -> ok end).
+
+blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
#qistate { dir = Dir,
@@ -519,8 +540,8 @@ blank_state_dir(Dir) ->
journal_handle = undefined,
dirty_count = 0,
max_journal_entries = MaxJournal,
- on_sync = fun (_) -> ok end,
- on_sync_msg = fun (_) -> ok end,
+ on_sync = OnSyncFun,
+ on_sync_msg = OnSyncMsgFun,
unconfirmed = gb_sets:new(),
unconfirmed_msg = gb_sets:new(),
pre_publish_cache = [],
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 41a13ac9b6..15eac57e68 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -532,38 +532,29 @@ terminate(_Reason, State) ->
%% the only difference between purge and delete is that delete also
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
- %% TODO: there is no need to interact with qi at all - which we do
- %% as part of 'purge' and 'purge_pending_ack', other than
- %% deleting it.
- {_PurgeCount, State1} = purge(State),
- State2 = #vqstate { index_state = IndexState,
- msg_store_clients = {MSCStateP, MSCStateT} } =
- purge_pending_ack(false, State1),
- IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ %% Normally when we purge messages we interact with the qi by
+ %% issues delivers and acks for every purged message. In this case
+ %% we don't need to do that, so we just delete the qi.
+ State1 = purge_and_index_reset(State),
+ State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack_delete_and_terminate(State1),
case MSCStateP of
undefined -> ok;
_ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
end,
rabbit_msg_store:client_delete_and_terminate(MSCStateT),
- a(State2 #vqstate { index_state = IndexState1,
- msg_store_clients = undefined }).
+ a(State2 #vqstate { msg_store_clients = undefined }).
delete_crashed(#amqqueue{name = QName}) ->
ok = rabbit_queue_index:erase(QName).
-purge(State = #vqstate { q4 = Q4,
- len = Len }) ->
- %% TODO: when there are no pending acks, which is a common case,
- %% we could simply wipe the qi instead of issuing delivers and
- %% acks for all the messages.
- State1 = remove_queue_entries(Q4, State),
-
- State2 = #vqstate { q1 = Q1 } =
- purge_betas_and_deltas(State1 #vqstate { q4 = ?QUEUE:new() }),
-
- State3 = remove_queue_entries(Q1, State2),
-
- {Len, a(State3 #vqstate { q1 = ?QUEUE:new() })}.
+purge(State = #vqstate { len = Len }) ->
+ case is_pending_ack_empty(State) of
+ true ->
+ {Len, purge_and_index_reset(State)};
+ false ->
+ {Len, purge_when_pending_acks(State)}
+ end.
purge_acks(State) -> a(purge_pending_ack(false, State)).
@@ -753,10 +744,8 @@ len(#vqstate { len = Len }) -> Len.
is_empty(State) -> 0 == len(State).
-depth(State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA }) ->
- len(State) + gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+depth(State) ->
+ len(State) + count_pending_acks(State).
set_ram_duration_target(
DurationTarget, State = #vqstate {
@@ -1133,7 +1122,7 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, DelsAndAcksFun, State) ->
{Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
lists:foldr(
fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
@@ -1156,9 +1145,7 @@ betas_from_index_entries(List, TransientThreshold, RPA, DPA, QPA, IndexState) ->
end
end
end, {?QUEUE:new(), [], [], 0, 0}, List),
- {Filtered, RamReadyCount, RamBytes,
- rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState))}.
+ {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
%% [0] We don't increase RamBytes here, even though it pertains to
%% unacked messages too, since if HaveMsg then the message must have
%% been stored in the QI, thus the message must have been in
@@ -1384,25 +1371,75 @@ remove(AckRequired, MsgStatus = #msg_status {
State2 #vqstate {out_counter = OutCount + 1,
index_state = IndexState2})}.
-purge_betas_and_deltas(State = #vqstate { q3 = Q3 }) ->
+%%----------------------------------------------------------------------------
+%% Helpers for Public API purge/1 function
+%%----------------------------------------------------------------------------
+
+%% The difference between purge_when_pending_acks/1
+%% vs. purge_and_index_reset/1 is that the first one issues a deliver
+%% and an ack to the queue index for every message that's being
+%% removed, while the later just resets the queue index state.
+purge_when_pending_acks(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
+ a(State1).
+
+purge_and_index_reset(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(none), State),
+ a(reset_qi_state(State1)).
+
+%% This function removes messages from each of {q1, q2, q3, q4}.
+%%
+%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
+%% are specially handled by purge_betas_and_deltas/2.
+%%
+%% purge_betas_and_deltas/2 loads messages from the queue index,
+%% filling up q3 and in some cases moving messages form q2 to q3 while
+%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
+%% messages loaded into q3 are removed by calling
+%% remove_queue_entries/3 until there are no more messages to be read
+%% from the queue index. Messages are read in batches from the queue
+%% index.
+purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
+ State1 = remove_queue_entries(Q4, AfterFun, State),
+
+ State2 = #vqstate {q1 = Q1} =
+ purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
+
+ State3 = remove_queue_entries(Q1, AfterFun, State2),
+
+ a(State3#vqstate{q1 = ?QUEUE:new()}).
+
+reset_qi_state(State = #vqstate{index_state = IndexState}) ->
+ State#vqstate{index_state =
+ rabbit_queue_index:reset_state(IndexState)}.
+
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
+purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { q3 = Q3 }) ->
case ?QUEUE:is_empty(Q3) of
true -> State;
- false -> State1 = remove_queue_entries(Q3, State),
- purge_betas_and_deltas(maybe_deltas_to_betas(
+ false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State),
+ purge_betas_and_deltas(DelsAndAcksFun,
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
State1#vqstate{q3 = ?QUEUE:new()}))
end.
-remove_queue_entries(Q, State = #vqstate{index_state = IndexState,
- msg_store_clients = MSCState}) ->
+remove_queue_entries(Q, DelsAndAcksFun,
+ State = #vqstate{msg_store_clients = MSCState}) ->
{MsgIdsByStore, Delivers, Acks, State1} =
?QUEUE:foldl(fun remove_queue_entries1/2,
{orddict:new(), [], [], State}, Q),
ok = orddict:fold(fun (IsPersistent, MsgIds, ok) ->
msg_store_remove(MSCState, IsPersistent, MsgIds)
end, ok, MsgIdsByStore),
- IndexState1 = rabbit_queue_index:ack(
- Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
- State1#vqstate{index_state = IndexState1}.
+ DelsAndAcksFun(Delivers, Acks, State1).
remove_queue_entries1(
#msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
@@ -1417,6 +1454,18 @@ remove_queue_entries1(
cons_if(IndexOnDisk, SeqId, Acks),
stats({-1, 0}, {MsgStatus, none}, State)}.
+process_delivers_and_acks_fun(deliver_and_ack) ->
+ fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
+ IndexState1 =
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State #vqstate { index_state = IndexState1 }
+ end;
+process_delivers_and_acks_fun(_) ->
+ fun (_, _, State) ->
+ State
+ end.
+
%%----------------------------------------------------------------------------
%% Internal gubbins for publishing
%%----------------------------------------------------------------------------
@@ -1611,11 +1660,29 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
end.
purge_pending_ack(KeepPersistent,
- State = #vqstate { ram_pending_ack = RPA,
- disk_pending_ack = DPA,
- qi_pending_ack = QPA,
- index_state = IndexState,
+ State = #vqstate { index_state = IndexState,
msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ case KeepPersistent of
+ true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
+ State1;
+ false -> IndexState1 =
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }
+ end.
+
+purge_pending_ack_delete_and_terminate(
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }.
+
+purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
{IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
rabbit_misc:gb_trees_fold(
@@ -1625,19 +1692,26 @@ purge_pending_ack(KeepPersistent,
State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
disk_pending_ack = gb_trees:empty(),
qi_pending_ack = gb_trees:empty()},
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
- case KeepPersistent of
- true -> case orddict:find(false, MsgIdsByStore) of
- error -> State1;
- {ok, MsgIds} -> ok = msg_store_remove(MSCState, false,
- MsgIds),
- State1
- end;
- false -> IndexState1 =
- rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
- [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- State1 #vqstate { index_state = IndexState1 }
+%% MsgIdsByStore is an orddict with two keys:
+%%
+%% true: holds a list of Persistent Message Ids.
+%% false: holds a list of Transient Message Ids.
+%%
+%% When we call orddict:to_list/1 we get two sets of msg ids, where
+%% IsPersistent is either true for persistent messages or false for
+%% transient ones. The msg_store_remove/3 function takes this boolean
+%% flag to determine from which store the messages should be removed
+%% from.
+remove_msgs_by_id(MsgIdsByStore, MSCState) ->
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
+
+remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
+ case orddict:find(false, MsgIdsByStore) of
+ error -> ok;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
end.
accumulate_ack_init() -> {[], orddict:new(), []}.
@@ -1971,9 +2045,15 @@ fetch_from_q3(State = #vqstate { q1 = Q1,
{loaded, {MsgStatus, State2}}
end.
-maybe_deltas_to_betas(State = #vqstate { delta = ?BLANK_DELTA_PATTERN(X) }) ->
+maybe_deltas_to_betas(State) ->
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ maybe_deltas_to_betas(AfterFun, State).
+
+maybe_deltas_to_betas(_DelsAndAcksFun,
+ State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
State;
-maybe_deltas_to_betas(State = #vqstate {
+maybe_deltas_to_betas(DelsAndAcksFun,
+ State = #vqstate {
q2 = Q2,
delta = Delta,
q3 = Q3,
@@ -1993,19 +2073,20 @@ maybe_deltas_to_betas(State = #vqstate {
DeltaSeqIdEnd]),
{List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
IndexState),
- {Q3a, RamCountsInc, RamBytesInc, IndexState2} =
+ {Q3a, RamCountsInc, RamBytesInc, State1} =
betas_from_index_entries(List, TransientThreshold,
- RPA, DPA, QPA, IndexState1),
- State1 = State #vqstate { index_state = IndexState2,
- ram_msg_count = RamMsgCount + RamCountsInc,
- ram_bytes = RamBytes + RamBytesInc,
- disk_read_count = DiskReadCount + RamCountsInc},
+ RPA, DPA, QPA, DelsAndAcksFun,
+ State #vqstate { index_state = IndexState1 }),
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc },
case ?QUEUE:len(Q3a) of
0 ->
%% we ignored every message in the segment due to it being
%% transient and below the threshold
maybe_deltas_to_betas(
- State1 #vqstate {
+ DelsAndAcksFun,
+ State2 #vqstate {
delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
Q3aLen ->
Q3b = ?QUEUE:join(Q3, Q3a),
@@ -2013,14 +2094,14 @@ maybe_deltas_to_betas(State = #vqstate {
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
- State1 #vqstate { q2 = ?QUEUE:new(),
+ State2 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
q3 = ?QUEUE:join(Q3b, Q2) };
N when N > 0 ->
Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
count = N,
end_seq_id = DeltaSeqIdEnd }),
- State1 #vqstate { delta = Delta1,
+ State2 #vqstate { delta = Delta1,
q3 = Q3b }
end
end.