diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-11-02 09:07:11 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-11-02 09:15:54 +0000 |
| commit | 9a2ea744ab56fe1caf18c0a70ea850d4a65745a3 (patch) | |
| tree | 8c7d8549b5724d54756bb97d7532fb98cdf515f0 /src | |
| parent | 9dcaed2632801c1849cf28be91c71d496f186c3e (diff) | |
| download | rabbitmq-server-git-9a2ea744ab56fe1caf18c0a70ea850d4a65745a3.tar.gz | |
rabbit_fifo: clarify prefix_msg_count
And fix two eunit tests.
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_fifo.erl | 42 |
1 files changed, 30 insertions, 12 deletions
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 178642dc9f..f53ffaa760 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -158,7 +158,7 @@ % enqueues % rabbit_fifo_index can be slow when calculating the smallest % index when there are large gaps but should be faster than gb_trees - % for normal appending operations - backed by a map + % for normal appending operations as it's backed by a map ra_indexes = rabbit_fifo_index:empty() :: rabbit_fifo_index:state(), % consumers need to reflect consumer state at time of snapshot % needs to be part of snapshot @@ -170,6 +170,16 @@ cancel_consumer_handler :: maybe(applied_mfa()), become_leader_handler :: maybe(applied_mfa()), metrics_handler :: maybe(applied_mfa()), + %% This is a special field that is only used for snapshots + %% It represents the number of queued messages at the time the + %% dehydrated snapshot state was cached. + %% As release_cursors are only emitted for raft indexes where all + %% prior messages no longer contribute to the current state we can + %% replace all message payloads at some index with a single integer + %% to be decremented during `checkout_one' until it's 0 after which + %% it instead takes messages from the `messages' map. + %% This is done so that consumers are still served in a deterministic + %% order on recovery. prefix_msg_count = 0 :: non_neg_integer() }). @@ -663,7 +673,7 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun(dummy, #state{prefix_msg_count = MsgCount} = S0) -> + State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) -> S0#state{prefix_msg_count = MsgCount + 1}; ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) @@ -833,7 +843,9 @@ take_next_msg(#state{prefix_msg_count = 0, end; take_next_msg(#state{prefix_msg_count = MsgCount, messages = Messages} = State) -> - {dummy, State, Messages, MsgCount - 1}. + %% there is still a prefix message count for the consumer + %% "fake" a '$prefix_msg' message + {'$prefix_msg', State, Messages, MsgCount - 1}. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. @@ -871,7 +883,7 @@ checkout_one(#state{service_queue = SQ0, prefix_msg_count = PrefMsgC, consumers = Cons}, Msg = case ConsumerMsg of - dummy -> dummy; + '$prefix_msg' -> '$prefix_msg'; {_, {_, M}} -> M end, {success, ConsumerId, Next, Msg, State}; @@ -957,6 +969,8 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, end. +%% creates a dehydrated version of the current state to be cached and +%% potentially used to for a snaphot at a later point dehydrate_state(#state{messages = Messages0, consumers = Consumers, prefix_msg_count = MsgCount} = State) -> @@ -1042,15 +1056,20 @@ credit_enq_enq_checkout_settled_credit_test() -> credit_with_drained_test() -> Cid = {?FUNCTION_NAME, self()}, State0 = test_init(test), + %% checkout with a single credit {State1, _, _} = apply(meta(1), {checkout, {auto, 1, credited}, Cid}, [], State0), - {State2, _} = credit(Cid, 2, 0, 5, false, State1), - {State, DrainedEffs} = credit(Cid, 3, 0, 5, true, State2), + ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 1, + delivery_count = 0}}}, + State1), + {State, _Effs, Result} = + apply(meta(3), {credit, 0, 5, true, Cid}, [], State1), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 5}}}, State), - ?ASSERT_EFF({send_msg, _, {send_drained, [{?FUNCTION_NAME, 5}]}, cast}, - DrainedEffs), + ?assertEqual({multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 5}]}]}, + Result), ok. credit_and_drain_test() -> @@ -1062,16 +1081,15 @@ credit_and_drain_test() -> apply(meta(3), {checkout, {auto, 0, credited}, Cid}, [], State2), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, CheckEffs), - {State4, Effects, {send_credit_reply, 0}} = - apply(meta(4), {credit, 4, 0, true, Cid}, [], State3), + {State4, Effects, {multi, [{send_credit_reply, 0}, + {send_drained, [{?FUNCTION_NAME, 2}]}]}} = + apply(meta(4), {credit, 4, 0, true, Cid}, [], State3), ?assertMatch(#state{consumers = #{Cid := #consumer{credit = 0, delivery_count = 4}}}, State4), ?ASSERT_EFF({send_msg, _, {delivery, _, [{_, {_, first}}, {_, {_, second}}]}, _}, Effects), - ?ASSERT_EFF({send_msg, _, {send_drained, [{?FUNCTION_NAME, 2}]}, cast}, - Effects), {_State5, EnqEffs} = enq(5, 2, third, State4), ?ASSERT_NO_EFF({send_msg, _, {delivery, _, _}}, EnqEffs), ok. |
