summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@rabbitmq.com>2011-09-30 12:47:05 +0100
committerMatthias Radestock <matthias@rabbitmq.com>2011-09-30 12:47:05 +0100
commitf7a0cb7577bdb6edb5ce591018e06a5ed44ab307 (patch)
tree5afb621c94d75b7063809f4170c4434629ffcd8c
parent3b5732011382a264507fa2f71349f1316cdc48dc (diff)
parent8954d59776d3e9bf818098f58faddf3f90bc3941 (diff)
downloadrabbitmq-server-git-f7a0cb7577bdb6edb5ce591018e06a5ed44ab307.tar.gz
merge bug23764 into default
-rw-r--r--src/rabbit_backing_queue_qc.erl122
-rw-r--r--src/rabbit_channel.erl6
-rw-r--r--src/rabbit_tests.erl35
-rw-r--r--src/rabbit_variable_queue.erl352
4 files changed, 340 insertions, 175 deletions
diff --git a/src/rabbit_backing_queue_qc.erl b/src/rabbit_backing_queue_qc.erl
index 706f54c0ab..095202dd11 100644
--- a/src/rabbit_backing_queue_qc.erl
+++ b/src/rabbit_backing_queue_qc.erl
@@ -37,19 +37,21 @@
-export([prop_backing_queue_test/0, publish_multiple/4, timeout/2]).
-record(state, {bqstate,
- len, %% int
- messages, %% queue of {msg_props, basic_msg}
- acks, %% dict of acktag => {msg_props, basic_msg}
- confirms}). %% set of msgid
+ len, %% int
+ next_seq_id, %% int
+ messages, %% gb_trees of seqid => {msg_props, basic_msg}
+ acks, %% [{acktag, {seqid, {msg_props, basic_msg}}}]
+ confirms}). %% set of msgid
%% Initialise model
initial_state() ->
- #state{bqstate = qc_variable_queue_init(qc_test_queue()),
- len = 0,
- messages = queue:new(),
- acks = orddict:new(),
- confirms = gb_sets:new()}.
+ #state{bqstate = qc_variable_queue_init(qc_test_queue()),
+ len = 0,
+ next_seq_id = 0,
+ messages = gb_trees:empty(),
+ acks = [],
+ confirms = gb_sets:new()}.
%% Property
@@ -123,11 +125,11 @@ qc_fetch(#state{bqstate = BQ}) ->
{call, ?BQMOD, fetch, [boolean(), BQ]}.
qc_ack(#state{bqstate = BQ, acks = Acks}) ->
- {call, ?BQMOD, ack, [rand_choice(orddict:fetch_keys(Acks)), BQ]}.
+ {call, ?BQMOD, ack, [rand_choice(proplists:get_keys(Acks)), BQ]}.
qc_requeue(#state{bqstate = BQ, acks = Acks}) ->
{call, ?BQMOD, requeue,
- [rand_choice(orddict:fetch_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
+ [rand_choice(proplists:get_keys(Acks)), fun(MsgOpts) -> MsgOpts end, BQ]}.
qc_set_ram_duration_target(#state{bqstate = BQ}) ->
{call, ?BQMOD, set_ram_duration_target,
@@ -155,10 +157,10 @@ qc_purge(#state{bqstate = BQ}) ->
precondition(#state{acks = Acks}, {call, ?BQMOD, Fun, _Arg})
when Fun =:= ack; Fun =:= requeue ->
- orddict:size(Acks) > 0;
+ length(Acks) > 0;
precondition(#state{messages = Messages},
- {call, ?BQMOD, publish_delivered, _Arg}) ->
- queue:is_empty(Messages);
+ {call, ?BQMOD, publish_delivered, _Arg}) ->
+ gb_trees:is_empty(Messages);
precondition(_S, {call, ?BQMOD, _Fun, _Arg}) ->
true;
precondition(_S, {call, ?MODULE, timeout, _Arg}) ->
@@ -169,14 +171,18 @@ precondition(#state{len = Len}, {call, ?MODULE, publish_multiple, _Arg}) ->
%% Model updates
next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
- #state{len = Len, messages = Messages, confirms = Confirms} = S,
+ #state{len = Len,
+ messages = Messages,
+ confirms = Confirms,
+ next_seq_id = NextSeq} = S,
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
NeedsConfirm =
{call, erlang, element,
[?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ,
len = Len + 1,
- messages = queue:in({MsgProps, Msg}, Messages),
+ next_seq_id = NextSeq + 1,
+ messages = gb_trees:insert(NextSeq, {MsgProps, Msg}, Messages),
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
@@ -184,17 +190,19 @@ next_state(S, BQ, {call, ?BQMOD, publish, [Msg, MsgProps, _Pid, _BQ]}) ->
next_state(S, BQ, {call, _, publish_multiple, [Msg, MsgProps, _BQ, Count]}) ->
#state{len = Len, messages = Messages} = S,
- Messages1 = repeat(Messages, fun(Msgs) ->
- queue:in({MsgProps, Msg}, Msgs)
- end, Count),
- S#state{bqstate = BQ,
- len = Len + Count,
- messages = Messages1};
+ {S1, Msgs1} = repeat({S, Messages},
+ fun ({#state{next_seq_id = NextSeq} = State, Msgs}) ->
+ {State #state { next_seq_id = NextSeq + 1},
+ gb_trees:insert(NextSeq, {MsgProps, Msg}, Msgs)}
+ end, Count),
+ S1#state{bqstate = BQ,
+ len = Len + Count,
+ messages = Msgs1};
next_state(S, Res,
{call, ?BQMOD, publish_delivered,
[AckReq, Msg, MsgProps, _Pid, _BQ]}) ->
- #state{confirms = Confirms, acks = Acks} = S,
+ #state{confirms = Confirms, acks = Acks, next_seq_id = NextSeq} = S,
AckTag = {call, erlang, element, [1, Res]},
BQ1 = {call, erlang, element, [2, Res]},
MsgId = {call, erlang, element, [?RECORD_INDEX(id, basic_message), Msg]},
@@ -202,12 +210,13 @@ next_state(S, Res,
{call, erlang, element,
[?RECORD_INDEX(needs_confirming, message_properties), MsgProps]},
S#state{bqstate = BQ1,
+ next_seq_id = NextSeq + 1,
confirms = case eval(NeedsConfirm) of
true -> gb_sets:add(MsgId, Confirms);
_ -> Confirms
end,
acks = case AckReq of
- true -> orddict:append(AckTag, {MsgProps, Msg}, Acks);
+ true -> [{AckTag, {NextSeq, {MsgProps, Msg}}}|Acks];
false -> Acks
end
};
@@ -218,34 +227,36 @@ next_state(S, Res, {call, ?BQMOD, fetch, [AckReq, _BQ]}) ->
BQ1 = {call, erlang, element, [2, Res]},
AckTag = {call, erlang, element, [3, ResultInfo]},
S1 = S#state{bqstate = BQ1},
- case queue:out(Messages) of
- {empty, _M2} ->
- S1;
- {{value, MsgProp_Msg}, M2} ->
- S2 = S1#state{len = Len - 1, messages = M2},
- case AckReq of
- true ->
- S2#state{acks = orddict:append(AckTag, MsgProp_Msg, Acks)};
- false ->
- S2
- end
+ case gb_trees:is_empty(Messages) of
+ true -> S1;
+ false -> {SeqId, MsgProp_Msg, M2} = gb_trees:take_smallest(Messages),
+ S2 = S1#state{len = Len - 1, messages = M2},
+ case AckReq of
+ true ->
+ S2#state{acks = [{AckTag, {SeqId, MsgProp_Msg}}|Acks]};
+ false ->
+ S2
+ end
end;
next_state(S, Res, {call, ?BQMOD, ack, [AcksArg, _BQ]}) ->
#state{acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
S#state{bqstate = BQ1,
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, Res, {call, ?BQMOD, requeue, [AcksArg, _F, _V]}) ->
- #state{len = Len, messages = Messages, acks = AcksState} = S,
+ #state{messages = Messages, acks = AcksState} = S,
BQ1 = {call, erlang, element, [2, Res]},
- RequeueMsgs = lists:append([orddict:fetch(Key, AcksState) ||
- Key <- AcksArg]),
+ Messages1 = lists:foldl(fun (AckTag, Msgs) ->
+ {SeqId, MsgPropsMsg} =
+ proplists:get_value(AckTag, AcksState),
+ gb_trees:insert(SeqId, MsgPropsMsg, Msgs)
+ end, Messages, AcksArg),
S#state{bqstate = BQ1,
- len = Len + length(RequeueMsgs),
- messages = queue:join(Messages, queue:from_list(RequeueMsgs)),
- acks = lists:foldl(fun orddict:erase/2, AcksState, AcksArg)};
+ len = gb_trees:size(Messages1),
+ messages = Messages1,
+ acks = lists:foldl(fun proplists:delete/2, AcksState, AcksArg)};
next_state(S, BQ, {call, ?BQMOD, set_ram_duration_target, _Args}) ->
S#state{bqstate = BQ};
@@ -260,8 +271,8 @@ next_state(S, Res, {call, ?BQMOD, drain_confirmed, _Args}) ->
next_state(S, BQ1, {call, ?BQMOD, dropwhile, _Args}) ->
#state{messages = Messages} = S,
- Messages1 = drop_messages(Messages),
- S#state{bqstate = BQ1, len = queue:len(Messages1), messages = Messages1};
+ Msgs1 = drop_messages(Messages),
+ S#state{bqstate = BQ1, len = gb_trees:size(Msgs1), messages = Msgs1};
next_state(S, _Res, {call, ?BQMOD, is_empty, _Args}) ->
S;
@@ -271,7 +282,7 @@ next_state(S, BQ, {call, ?MODULE, timeout, _Args}) ->
next_state(S, Res, {call, ?BQMOD, purge, _Args}) ->
BQ1 = {call, erlang, element, [2, Res]},
- S#state{bqstate = BQ1, len = 0, messages = queue:new()}.
+ S#state{bqstate = BQ1, len = 0, messages = gb_trees:empty()}.
%% Postconditions
@@ -279,9 +290,9 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
#state{messages = Messages, len = Len, acks = Acks, confirms = Confrms} = S,
case Res of
{{MsgFetched, _IsDelivered, AckTag, RemainingLen}, _BQ} ->
- {_MsgProps, Msg} = queue:head(Messages),
+ {_SeqId, {_MsgProps, Msg}} = gb_trees:smallest(Messages),
MsgFetched =:= Msg andalso
- not orddict:is_key(AckTag, Acks) andalso
+ not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms) andalso
RemainingLen =:= Len - 1;
{empty, _BQ} ->
@@ -290,7 +301,7 @@ postcondition(S, {call, ?BQMOD, fetch, _Args}, Res) ->
postcondition(S, {call, ?BQMOD, publish_delivered, _Args}, {AckTag, _BQ}) ->
#state{acks = Acks, confirms = Confrms} = S,
- not orddict:is_key(AckTag, Acks) andalso
+ not proplists:is_defined(AckTag, Acks) andalso
not gb_sets:is_element(AckTag, Confrms);
postcondition(#state{len = Len}, {call, ?BQMOD, purge, _Args}, Res) ->
@@ -359,7 +370,14 @@ qc_test_queue(Durable) ->
pid = self()}.
rand_choice([]) -> [];
-rand_choice(List) -> [lists:nth(random:uniform(length(List)), List)].
+rand_choice(List) -> rand_choice(List, [], random:uniform(length(List))).
+
+rand_choice(_List, Selection, 0) ->
+ Selection;
+rand_choice(List, Selection, N) ->
+ Picked = lists:nth(random:uniform(length(List)), List),
+ rand_choice(List -- [Picked], [Picked | Selection],
+ N - 1).
dropfun(Props) ->
Expiry = eval({call, erlang, element,
@@ -367,10 +385,10 @@ dropfun(Props) ->
Expiry =/= 1.
drop_messages(Messages) ->
- case queue:out(Messages) of
- {empty, _} ->
+ case gb_trees:is_empty(Messages) of
+ true ->
Messages;
- {{value, MsgProps_Msg}, M2} ->
+ false -> {_Seq, MsgProps_Msg, M2} = gb_trees:take_smallest(Messages),
MsgProps = {call, erlang, element, [1, MsgProps_Msg]},
case dropfun(MsgProps) of
true -> drop_messages(M2);
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index d2f5527726..9eb77c326a 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -836,14 +836,10 @@ handle_method(#'basic.recover_async'{requeue = true},
OkFun = fun () -> ok end,
ok = fold_per_queue(
fun (QPid, MsgIds, ok) ->
- %% The Qpid python test suite incorrectly assumes
- %% that messages will be requeued in their original
- %% order. To keep it happy we reverse the id list
- %% since we are given them in reverse order.
rabbit_misc:with_exit_handler(
OkFun, fun () ->
rabbit_amqqueue:requeue(
- QPid, lists:reverse(MsgIds), self())
+ QPid, MsgIds, self())
end)
end, ok, UAMQ),
ok = notify_limiter(Limiter, UAMQ),
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 44c1337678..010279302e 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -2314,9 +2314,42 @@ test_variable_queue() ->
fun test_variable_queue_all_the_bits_not_covered_elsewhere2/1,
fun test_dropwhile/1,
fun test_dropwhile_varying_ram_duration/1,
- fun test_variable_queue_ack_limiting/1]],
+ fun test_variable_queue_ack_limiting/1,
+ fun test_variable_queue_requeue/1]],
passed.
+test_variable_queue_requeue(VQ0) ->
+ Interval = 50,
+ Count = rabbit_queue_index:next_segment_boundary(0) + 2 * Interval,
+ Seq = lists:seq(1, Count),
+ VQ1 = rabbit_variable_queue:set_ram_duration_target(0, VQ0),
+ VQ2 = variable_queue_publish(false, Count, VQ1),
+ {VQ3, Acks} = lists:foldl(
+ fun (_N, {VQN, AckTags}) ->
+ {{#basic_message{}, false, AckTag, _}, VQM} =
+ rabbit_variable_queue:fetch(true, VQN),
+ {VQM, [AckTag | AckTags]}
+ end, {VQ2, []}, Seq),
+ Subset = lists:foldl(fun ({Ack, N}, Acc) when N rem Interval == 0 ->
+ [Ack | Acc];
+ (_, Acc) ->
+ Acc
+ end, [], lists:zip(Acks, Seq)),
+ {_MsgIds, VQ4} = rabbit_variable_queue:requeue(Acks -- Subset,
+ fun(X) -> X end, VQ3),
+ VQ5 = lists:foldl(fun (AckTag, VQN) ->
+ {_MsgId, VQM} = rabbit_variable_queue:requeue(
+ [AckTag], fun(X) -> X end, VQN),
+ VQM
+ end, VQ4, Subset),
+ VQ6 = lists:foldl(fun (AckTag, VQa) ->
+ {{#basic_message{}, true, AckTag, _}, VQb} =
+ rabbit_variable_queue:fetch(true, VQa),
+ VQb
+ end, VQ5, lists:reverse(Acks)),
+ {empty, VQ7} = rabbit_variable_queue:fetch(true, VQ6),
+ VQ7.
+
test_variable_queue_ack_limiting(VQ0) ->
%% start by sending in a bunch of messages
Len = 1024,
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 78d26c510e..d4f51f8d4a 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -158,19 +158,18 @@
%% to search through qi segments looking for messages that are yet to
%% be acknowledged.
%%
-%% Pending acks are recorded in memory either as the tuple {SeqId,
-%% MsgId, MsgProps} (tuple-form) or as the message itself (message-
-%% form). Acks for persistent messages are always stored in the tuple-
-%% form. Acks for transient messages are also stored in tuple-form if
-%% the message has been sent to disk as part of the memory reduction
-%% process. For transient messages that haven't already been written
-%% to disk, acks are stored in message-form.
+%% Pending acks are recorded in memory by storing the message itself.
+%% If the message has been sent to disk, we do not store the message
+%% content. During memory reduction, pending acks containing message
+%% content have that content removed and the corresponding messages
+%% are pushed out to disk.
%%
-%% During memory reduction, acks stored in message-form are converted
-%% to tuple-form, and the corresponding messages are pushed out to
-%% disk.
+%% Messages from pending acks are returned to q4, q3 and delta during
+%% requeue, based on the limits of seq_id contained in each. Requeued
+%% messages retain their original seq_id, maintaining order
+%% when requeued.
%%
-%% The order in which alphas are pushed to betas and message-form acks
+%% The order in which alphas are pushed to betas and pending acks
%% are pushed to disk is determined dynamically. We always prefer to
%% push messages for the source (alphas or acks) that is growing the
%% fastest (with growth measured as avg. ingress - avg. egress). In
@@ -281,6 +280,8 @@
end_seq_id %% end_seq_id is exclusive
}).
+-record(merge_funs, {new, join, out, in, publish}).
+
%% When we discover, on publish, that we should write some indices to
%% disk for some betas, the IO_BATCH_SIZE sets the number of betas
%% that we must be due to write indices for before we do any work at
@@ -434,7 +435,7 @@ terminate(_Reason, State) ->
State1 = #vqstate { persistent_count = PCount,
index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(true, State),
+ purge_pending_ack(true, State),
PRef = case MSCStateP of
undefined -> undefined;
_ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
@@ -450,12 +451,12 @@ terminate(_Reason, State) ->
%% needs to delete everything that's been delivered and not ack'd.
delete_and_terminate(_Reason, State) ->
%% TODO: there is no need to interact with qi at all - which we do
- %% as part of 'purge' and 'remove_pending_ack', other than
+ %% as part of 'purge' and 'purge_pending_ack', other than
%% deleting it.
{_PurgeCount, State1} = purge(State),
State2 = #vqstate { index_state = IndexState,
msg_store_clients = {MSCStateP, MSCStateT} } =
- remove_pending_ack(false, State1),
+ purge_pending_ack(false, State1),
IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
case MSCStateP of
undefined -> ok;
@@ -558,34 +559,51 @@ fetch(AckRequired, State) ->
{Res, a(State3)}
end.
+ack([], State) ->
+ {[], State};
ack(AckTags, State) ->
- {MsgIds, State1} = ack(fun msg_store_remove/3,
- fun (_, State0) -> State0 end,
- AckTags, State),
- {MsgIds, a(State1)}.
-
-requeue(AckTags, MsgPropsFun, State) ->
- MsgPropsFun1 = fun (MsgProps) ->
- (MsgPropsFun(MsgProps)) #message_properties {
- needs_confirming = false }
- end,
- {MsgIds, State1} =
- ack(fun (_, _, _) -> ok end,
- fun (#msg_status { msg = Msg, msg_props = MsgProps }, State1) ->
- {_SeqId, State2} = publish(Msg, MsgPropsFun1(MsgProps),
- true, false, State1),
- State2;
- ({IsPersistent, MsgId, MsgProps}, State1) ->
- #vqstate { msg_store_clients = MSCState } = State1,
- {{ok, Msg = #basic_message{}}, MSCState1} =
- msg_store_read(MSCState, IsPersistent, MsgId),
- State2 = State1 #vqstate { msg_store_clients = MSCState1 },
- {_SeqId, State3} = publish(Msg, MsgPropsFun1(MsgProps),
- true, true, State2),
- State3
- end,
- AckTags, State),
- {MsgIds, a(reduce_memory_use(State1))}.
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ persistent_count = PCount,
+ ack_out_counter = AckOutCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2}) ->
+ {MsgStatus, State3} = remove_pending_ack(SeqId, State2),
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
+ PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
+ orddict:new(), MsgIdsByStore)),
+ {lists:reverse(AllMsgIds),
+ a(State1 #vqstate { index_state = IndexState1,
+ persistent_count = PCount1,
+ ack_out_counter = AckOutCount + length(AckTags) })}.
+
+requeue(AckTags, MsgPropsFun, #vqstate { delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len } = State) ->
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ alpha_funs(),
+ MsgPropsFun, State),
+ {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
+ delta_limit(Delta),
+ beta_funs(),
+ MsgPropsFun, State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ MsgPropsFun, State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, a(reduce_memory_use(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount }))}.
len(#vqstate { len = Len }) -> Len.
@@ -782,6 +800,8 @@ msg_status(IsPersistent, SeqId, Msg = #basic_message { id = MsgId },
msg_on_disk = false, index_on_disk = false,
msg_props = MsgProps }.
+trim_msg_status(MsgStatus) -> MsgStatus #msg_status { msg = undefined }.
+
with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
{Result, MSCStateP1} = Fun(MSCStateP),
{Result, {MSCStateP1, MSCStateT}};
@@ -835,26 +855,30 @@ maybe_write_delivered(false, _SeqId, IndexState) ->
maybe_write_delivered(true, SeqId, IndexState) ->
rabbit_queue_index:deliver([SeqId], IndexState).
-betas_from_index_entries(List, TransientThreshold, IndexState) ->
+betas_from_index_entries(List, TransientThreshold, PA, IndexState) ->
{Filtered, Delivers, Acks} =
lists:foldr(
fun ({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered},
- {Filtered1, Delivers1, Acks1}) ->
+ {Filtered1, Delivers1, Acks1} = Acc) ->
case SeqId < TransientThreshold andalso not IsPersistent of
true -> {Filtered1,
cons_if(not IsDelivered, SeqId, Delivers1),
[SeqId | Acks1]};
- false -> {[m(#msg_status { msg = undefined,
- msg_id = MsgId,
- seq_id = SeqId,
- is_persistent = IsPersistent,
- is_delivered = IsDelivered,
- msg_on_disk = true,
- index_on_disk = true,
- msg_props = MsgProps
- }) | Filtered1],
- Delivers1,
- Acks1}
+ false -> case dict:is_key(SeqId, PA) of
+ false -> {[m(#msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_on_disk = true,
+ index_on_disk = true,
+ msg_props = MsgProps
+ }) | Filtered1],
+ Delivers1,
+ Acks1};
+ true -> Acc
+ end
end
end, {[], [], []}, List),
{bpqueue:from_list([{true, Filtered}]),
@@ -1015,11 +1039,10 @@ internal_fetch(AckRequired, MsgStatus = #msg_status {
end,
Ack = fun () -> rabbit_queue_index:ack([SeqId], IndexState1) end,
IndexState2 =
- case {AckRequired, MsgOnDisk, IndexOnDisk, IsPersistent} of
- {false, true, false, _} -> Rem(), IndexState1;
- {false, true, true, _} -> Rem(), Ack();
- { true, true, true, false} -> Ack();
- _ -> IndexState1
+ case {AckRequired, MsgOnDisk, IndexOnDisk} of
+ {false, true, false} -> Rem(), IndexState1;
+ {false, true, true} -> Rem(), Ack();
+ _ -> IndexState1
end,
%% 3. If an ack is required, add something sensible to PA
@@ -1167,15 +1190,13 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus,
record_pending_ack(#msg_status { seq_id = SeqId,
msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_on_disk = MsgOnDisk,
- msg_props = MsgProps } = MsgStatus,
+ msg_on_disk = MsgOnDisk } = MsgStatus,
State = #vqstate { pending_ack = PA,
ram_ack_index = RAI,
ack_in_counter = AckInCount}) ->
{AckEntry, RAI1} =
case MsgOnDisk of
- true -> {{IsPersistent, MsgId, MsgProps}, RAI};
+ true -> {m(trim_msg_status(MsgStatus)), RAI};
false -> {MsgStatus, gb_trees:insert(SeqId, MsgId, RAI)}
end,
PA1 = dict:store(SeqId, AckEntry, PA),
@@ -1183,12 +1204,20 @@ record_pending_ack(#msg_status { seq_id = SeqId,
ram_ack_index = RAI1,
ack_in_counter = AckInCount + 1}.
-remove_pending_ack(KeepPersistent,
- State = #vqstate { pending_ack = PA,
- index_state = IndexState,
- msg_store_clients = MSCState }) ->
- {PersistentSeqIds, MsgIdsByStore, _AllMsgIds} =
- dict:fold(fun accumulate_ack/3, accumulate_ack_init(), PA),
+remove_pending_ack(SeqId, State = #vqstate { pending_ack = PA,
+ ram_ack_index = RAI }) ->
+ {dict:fetch(SeqId, PA),
+ State #vqstate { pending_ack = dict:erase(SeqId, PA),
+ ram_ack_index = gb_trees:delete_any(SeqId, RAI) }}.
+
+purge_pending_ack(KeepPersistent,
+ State = #vqstate { pending_ack = PA,
+ index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
+ dict:fold(fun (_SeqId, MsgStatus, Acc) ->
+ accumulate_ack(MsgStatus, Acc)
+ end, accumulate_ack_init(), PA),
State1 = State #vqstate { pending_ack = dict:new(),
ram_ack_index = gb_trees:empty() },
case KeepPersistent of
@@ -1199,52 +1228,25 @@ remove_pending_ack(KeepPersistent,
State1
end;
false -> IndexState1 =
- rabbit_queue_index:ack(PersistentSeqIds, IndexState),
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
[ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
|| {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
State1 #vqstate { index_state = IndexState1 }
end.
-ack(_MsgStoreFun, _Fun, [], State) ->
- {[], State};
-ack(MsgStoreFun, Fun, AckTags, State) ->
- {{PersistentSeqIds, MsgIdsByStore, AllMsgIds},
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- persistent_count = PCount,
- ack_out_counter = AckOutCount }} =
- lists:foldl(
- fun (SeqId, {Acc, State2 = #vqstate { pending_ack = PA,
- ram_ack_index = RAI }}) ->
- AckEntry = dict:fetch(SeqId, PA),
- {accumulate_ack(SeqId, AckEntry, Acc),
- Fun(AckEntry, State2 #vqstate {
- pending_ack = dict:erase(SeqId, PA),
- ram_ack_index =
- gb_trees:delete_any(SeqId, RAI)})}
- end, {accumulate_ack_init(), State}, AckTags),
- IndexState1 = rabbit_queue_index:ack(PersistentSeqIds, IndexState),
- [ok = MsgStoreFun(MSCState, IsPersistent, MsgIds)
- || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)],
- PCount1 = PCount - find_persistent_count(sum_msg_ids_by_store_to_len(
- orddict:new(), MsgIdsByStore)),
- {lists:reverse(AllMsgIds),
- State1 #vqstate { index_state = IndexState1,
- persistent_count = PCount1,
- ack_out_counter = AckOutCount + length(AckTags) }}.
-
accumulate_ack_init() -> {[], orddict:new(), []}.
-accumulate_ack(_SeqId, #msg_status { is_persistent = false, %% ASSERTIONS
- msg_on_disk = false,
- index_on_disk = false,
- msg_id = MsgId },
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {PersistentSeqIdsAcc, MsgIdsByStore, [MsgId | AllMsgIds]};
-accumulate_ack(SeqId, {IsPersistent, MsgId, _MsgProps},
- {PersistentSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
- {cons_if(IsPersistent, SeqId, PersistentSeqIdsAcc),
- rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore),
+accumulate_ack(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_on_disk = MsgOnDisk,
+ index_on_disk = IndexOnDisk },
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
+ case MsgOnDisk of
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
[MsgId | AllMsgIds]}.
find_persistent_count(LensByStore) ->
@@ -1319,6 +1321,122 @@ msg_indices_written_to_disk(Callback, MsgIdSet) ->
end).
%%----------------------------------------------------------------------------
+%% Internal plumbing for requeue
+%%----------------------------------------------------------------------------
+
+alpha_funs() ->
+ #merge_funs {
+ new = fun queue:new/0,
+ join = fun queue:join/2,
+ out = fun queue:out/1,
+ in = fun queue:in/2,
+ publish = fun (#msg_status { msg = undefined } = MsgStatus, State) ->
+ read_msg(MsgStatus, State);
+ (MsgStatus, #vqstate {
+ ram_msg_count = RamMsgCount } = State) ->
+ {MsgStatus, State #vqstate {
+ ram_msg_count = RamMsgCount + 1 }}
+ end}.
+
+beta_funs() ->
+ #merge_funs {
+ new = fun bpqueue:new/0,
+ join = fun bpqueue:join/2,
+ out = fun (Q) ->
+ case bpqueue:out(Q) of
+ {{value, _IndexOnDisk, MsgStatus}, Q1} ->
+ {{value, MsgStatus}, Q1};
+ {empty, _Q1} = X ->
+ X
+ end
+ end,
+ in = fun (#msg_status { index_on_disk = IOD } = MsgStatus, Q) ->
+ bpqueue:in(IOD, MsgStatus, Q)
+ end,
+ publish = fun (#msg_status { msg_on_disk = MsgOnDisk } = MsgStatus,
+ State) ->
+ {#msg_status { index_on_disk = IndexOnDisk,
+ msg = Msg} = MsgStatus1,
+ #vqstate { ram_index_count = RamIndexCount,
+ ram_msg_count = RamMsgCount } =
+ State1} =
+ maybe_write_to_disk(not MsgOnDisk, false,
+ MsgStatus, State),
+ {MsgStatus1, State1 #vqstate {
+ ram_msg_count = RamMsgCount +
+ one_if(Msg =/= undefined),
+ ram_index_count = RamIndexCount +
+ one_if(not IndexOnDisk) }}
+ end}.
+
+%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, #merge_funs { new = QNew } = Funs,
+ MsgPropsFun, State) ->
+ queue_merge(SeqIds, Q, QNew(), MsgIds, Limit, Funs, MsgPropsFun, State).
+
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit,
+ #merge_funs { out = QOut, in = QIn, publish = QPublish } = Funs,
+ MsgPropsFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case QOut(Q) of
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, QIn(MsgStatus, Front), MsgIds,
+ Limit, Funs, MsgPropsFun, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, MsgPropsFun,
+ State),
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ QPublish(MsgStatus, State1),
+ queue_merge(Rest, Q, QIn(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, Funs, MsgPropsFun, State2)
+ end;
+queue_merge(SeqIds, Q, Front, MsgIds, _Limit, #merge_funs { join = QJoin },
+ _MsgPropsFun, State) ->
+ {SeqIds, QJoin(Front, Q), MsgIds, State}.
+
+delta_merge([], Delta, MsgIds, _MsgPropsFun, State) ->
+ {Delta, MsgIds, State};
+delta_merge(SeqIds, #delta { start_seq_id = StartSeqId,
+ count = Count,
+ end_seq_id = EndSeqId} = Delta,
+ MsgIds, MsgPropsFun, State) ->
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
+ {#msg_status { msg_id = MsgId,
+ index_on_disk = IndexOnDisk,
+ msg_on_disk = MsgOnDisk} = MsgStatus,
+ State1} =
+ msg_from_pending_ack(SeqId, MsgPropsFun, State0),
+ {_MsgStatus, State2} =
+ maybe_write_to_disk(not MsgOnDisk, not IndexOnDisk,
+ MsgStatus, State1),
+ {Delta0 #delta {
+ start_seq_id = lists:min([SeqId, StartSeqId]),
+ count = Count + 1,
+ end_seq_id = lists:max([SeqId + 1, EndSeqId]) },
+ [MsgId | MsgIds0], State2}
+ end, {Delta, MsgIds, State}, SeqIds).
+
+%% Mostly opposite of record_pending_ack/2
+msg_from_pending_ack(SeqId, MsgPropsFun, State) ->
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
+ remove_pending_ack(SeqId, State),
+ {MsgStatus #msg_status {
+ msg_props = (MsgPropsFun(MsgProps)) #message_properties {
+ needs_confirming = false } }, State1}.
+
+beta_limit(BPQ) ->
+ case bpqueue:out(BPQ) of
+ {{value, _Prefix, #msg_status { seq_id = SeqId }}, _BPQ} -> SeqId;
+ {empty, _BPQ} -> undefined
+ end.
+
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+
+%%----------------------------------------------------------------------------
%% Phase changes
%%----------------------------------------------------------------------------
@@ -1394,12 +1512,11 @@ limit_ram_acks(Quota, State = #vqstate { pending_ack = PA,
{Quota, State};
false ->
{SeqId, MsgId, RAI1} = gb_trees:take_largest(RAI),
- MsgStatus = #msg_status {
- msg_id = MsgId, %% ASSERTION
- is_persistent = false, %% ASSERTION
- msg_props = MsgProps } = dict:fetch(SeqId, PA),
- {_, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
- PA1 = dict:store(SeqId, {false, MsgId, MsgProps}, PA),
+ MsgStatus = #msg_status { msg_id = MsgId, is_persistent = false} =
+ dict:fetch(SeqId, PA),
+ {MsgStatus1, State1} =
+ maybe_write_to_disk(true, false, MsgStatus, State),
+ PA1 = dict:store(SeqId, m(trim_msg_status(MsgStatus1)), PA),
limit_ram_acks(Quota - 1,
State1 #vqstate { pending_ack = PA1,
ram_ack_index = RAI1 })
@@ -1505,6 +1622,7 @@ maybe_deltas_to_betas(State = #vqstate {
delta = Delta,
q3 = Q3,
index_state = IndexState,
+ pending_ack = PA,
transient_threshold = TransientThreshold }) ->
#delta { start_seq_id = DeltaSeqId,
count = DeltaCount,
@@ -1515,7 +1633,7 @@ maybe_deltas_to_betas(State = #vqstate {
{List, IndexState1} =
rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1, IndexState),
{Q3a, IndexState2} =
- betas_from_index_entries(List, TransientThreshold, IndexState1),
+ betas_from_index_entries(List, TransientThreshold, PA, IndexState1),
State1 = State #vqstate { index_state = IndexState2 },
case bpqueue:len(Q3a) of
0 ->
@@ -1587,9 +1705,9 @@ maybe_push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
State1 = #vqstate { ram_msg_count = RamMsgCount,
ram_index_count = RamIndexCount }} =
maybe_write_to_disk(true, false, MsgStatus, State),
- MsgStatus2 = m(MsgStatus1 #msg_status { msg = undefined }),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
RamIndexCount1 = RamIndexCount + one_if(not IndexOnDisk),
- State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount - 1,
ram_index_count = RamIndexCount1 },
maybe_push_alphas_to_betas(Generator, Consumer, Quota - 1, Qa,
Consumer(MsgStatus2, Qa, State2))