summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-11-02 09:07:11 +0000
committerkjnilsson <knilsson@pivotal.io>2018-11-02 09:15:54 +0000
commit9a2ea744ab56fe1caf18c0a70ea850d4a65745a3 (patch)
tree8c7d8549b5724d54756bb97d7532fb98cdf515f0 /src
parent9dcaed2632801c1849cf28be91c71d496f186c3e (diff)
downloadrabbitmq-server-git-9a2ea744ab56fe1caf18c0a70ea850d4a65745a3.tar.gz
rabbit_fifo: clarify prefix_msg_count
And fix two eunit tests.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_fifo.erl42
1 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 178642dc9f..f53ffaa760 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -158,7 +158,7 @@
% enqueues
% rabbit_fifo_index can be slow when calculating the smallest
% index when there are large gaps but should be faster than gb_trees
- % for normal appending operations - backed by a map
+ % for normal appending operations as it's backed by a map
ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(),
% consumers need to reflect consumer state at time of snapshot
% needs to be part of snapshot
@@ -170,6 +170,16 @@
cancel_consumer_handler :: maybe(applied_mfa()),
become_leader_handler :: maybe(applied_mfa()),
metrics_handler :: maybe(applied_mfa()),
+ %% This is a special field that is only used for snapshots
+ %% It represents the number of queued messages at the time the
+ %% dehydrated snapshot state was cached.
+ %% As release_cursors are only emitted for raft indexes where all
+ %% prior messages no longer contribute to the current state we can
+ %% replace all message payloads at some index with a single integer
+ %% to be decremented during `checkout_one' until it's 0 after which
+ %% it instead takes messages from the `messages' map.
+ %% This is done so that consumers are still served in a deterministic
+ %% order on recovery.
prefix_msg_count = 0 :: non_neg_integer()
}).
@@ -663,7 +673,7 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked,
end,
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
- State1 = lists:foldl(fun(dummy, #state{prefix_msg_count = MsgCount} = S0) ->
+ State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) ->
S0#state{prefix_msg_count = MsgCount + 1};
({MsgNum, Msg}, S0) ->
return_one(MsgNum, Msg, S0)
@@ -833,7 +843,9 @@ take_next_msg(#state{prefix_msg_count = 0,
end;
take_next_msg(#state{prefix_msg_count = MsgCount,
messages = Messages} = State) ->
- {dummy, State, Messages, MsgCount - 1}.
+ %% there is still a prefix message count for the consumer
+ %% "fake" a '$prefix_msg' message
+ {'$prefix_msg', State, Messages, MsgCount - 1}.
send_msg_effect({CTag, CPid}, Msgs) ->
{send_msg, CPid, {delivery, CTag, Msgs}, ra_event}.
@@ -871,7 +883,7 @@ checkout_one(#state{service_queue = SQ0,
prefix_msg_count = PrefMsgC,
consumers = Cons},
Msg = case ConsumerMsg of
- dummy -> dummy;
+ '$prefix_msg' -> '$prefix_msg';
{_, {_, M}} -> M
end,
{success, ConsumerId, Next, Msg, State};
@@ -957,6 +969,8 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
end.
+%% creates a dehydrated version of the current state to be cached and
+%% potentially used to for a snaphot at a later point
dehydrate_state(#state{messages = Messages0,
consumers = Consumers,
prefix_msg_count = MsgCount} = State) ->
@@ -1042,15 +1056,20 @@ credit_enq_enq_checkout_settled_credit_test() ->
credit_with_drained_test() ->
Cid = {?FUNCTION_NAME, self()},
State0 = test_init(test),
+ %% checkout with a single credit
{State1, _, _} =
apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0),
- {State2, _} = credit(Cid, 2, 0, 5, false, State1),
- {State, DrainedEffs} = credit(Cid, 3, 0, 5, true, State2),
+ ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1,
+ delivery_count = 0}}},
+ State1),
+ {State, _Effs, Result} =
+ apply(meta(3), {credit, 0, 5, true, Cid}, [], State1),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 5}}},
State),
- ?ASSERT_EFF({send_msg, _, {send_drained, [{?FUNCTION_NAME, 5}]}, cast},
- DrainedEffs),
+ ?assertEqual({multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 5}]}]},
+ Result),
ok.
credit_and_drain_test() ->
@@ -1062,16 +1081,15 @@ credit_and_drain_test() ->
apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs),
- {State4, Effects, {send_credit_reply, 0}} =
- apply(meta(4), {credit, 4, 0, true, Cid}, [], State3),
+ {State4, Effects, {multi, [{send_credit_reply, 0},
+ {send_drained, [{?FUNCTION_NAME, 2}]}]}} =
+ apply(meta(4), {credit, 4, 0, true, Cid}, [], State3),
?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0,
delivery_count = 4}}},
State4),
?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}},
{_, {_, second}}]}, _}, Effects),
- ?ASSERT_EFF({send_msg, _, {send_drained, [{?FUNCTION_NAME, 2}]}, cast},
- Effects),
{_State5, EnqEffs} = enq(5, 2, third, State4),
?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs),
ok.