diff options
| author | kjnilsson <knilsson@pivotal.io> | 2018-12-17 16:21:59 +0000 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2018-12-17 16:21:59 +0000 |
| commit | 8636e72a0f7500b6b78bc2dbb4ddc92f2d5bd3e5 (patch) | |
| tree | 00ccab451dc1421012f29ed95761d73a90bb4964 /src | |
| parent | 2695a45a5d69a59d79e5116b5412059ef7428961 (diff) | |
| download | rabbitmq-server-git-8636e72a0f7500b6b78bc2dbb4ddc92f2d5bd3e5.tar.gz | |
Quorum queue: return messages in order
Fixes bug that muddled up the checkout order when a consumer is
cancelled with more than 32 messages checked out.
Dialyzer fixes.
[#162698673]
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 57 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 14 | ||||
| -rw-r--r-- | src/rabbit_quorum_memory_manager.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 16 |
5 files changed, 60 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f749d9f30e..996774fe35 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, consumer_mapping = ConsumerMapping} = State0) -> case QueueStates of #{Name := QState0} -> + QName = rabbit_quorum_queue:queue_name(QState0), case rabbit_quorum_queue:handle_event(Evt, QState0) of {{delivery, CTag, Msgs}, QState1} -> AckRequired = case maps:find(CTag, ConsumerMapping) of @@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, true -> QState1 end, - QName = rabbit_quorum_queue:queue_name(QState2), State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), @@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, %% TODO: this should use dtree:take/3 {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(Name, QNames) of - {ok, QName} -> erase_queue_stats(QName); - error -> ok - end, + erase_queue_stats(QName), noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), queue_names = maps:remove(Name, QNames)}) diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d365d41a96..376ab5e5a2 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -56,7 +56,7 @@ make_discard/2, make_credit/4, make_purge/0, - make_update_state/1 + make_update_config/1 ]). -type raw_msg() :: term(). @@ -131,7 +131,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). --record(update_state, {config :: config()}). +-record(update_config, {config :: config()}). @@ -143,7 +143,7 @@ #discard{} | #credit{} | #purge{} | - #update_state{}. + #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). %% all the command types suppored by ra fifo @@ -260,10 +260,10 @@ -spec init(config()) -> state(). init(#{name := Name, queue_resource := Resource} = Conf) -> - update_state(Conf, #state{name = Name, + update_config(Conf, #state{name = Name, queue_resource = Resource}). -update_state(Conf, State) -> +update_config(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), @@ -515,8 +515,8 @@ apply(_, {nodeup, Node}, Effects0, service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> {State, Effects, ok}; -apply(_, #update_state{config = Conf}, Effects, State) -> - {update_state(Conf, State), Effects, ok}. +apply(_, #update_config{config = Conf}, Effects, State) -> + {update_config(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -900,12 +900,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, State0#state{messages = maps:put(MsgNum, Msg, Messages), returns = lqueue:in(MsgNum, Returns)}). -return_all(State, Checked) -> - maps:fold(fun (_, '$prefix_msg', S) -> - return_one(0, '$prefix_msg', S); - (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, State, Checked). +return_all(State, Checked0) -> + %% need to sort the list so that we return messages in the order + %% they were checked out + Checked = lists:sort(maps:to_list(Checked0)), + lists:foldl(fun ({_, '$prefix_msg'}, S) -> + return_one(0, '$prefix_msg', S); + ({_, {MsgNum, Msg}}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -1170,9 +1173,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. --spec make_update_state(config()) -> protocol(). -make_update_state(Config) -> - #update_state{config = Config}. +-spec make_update_config(config()) -> protocol(). +make_update_config(Config) -> + #update_config{config = Config}. add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) -> Bytes = message_size(Msg), @@ -1865,6 +1868,26 @@ purge_with_checkout_test() -> ?assertEqual(0, maps:size(Checked)), ok. +down_returns_checked_out_in_order_test() -> + S0 = test_init(?FUNCTION_NAME), + %% enqueue 100 + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, 100)), + ?assertEqual(100, maps:size(S1#state.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers), + ?assertEqual(100, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2), + Returns = lqueue:to_list(S#state.returns), + ?assertEqual(100, length(Returns)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + meta(Idx) -> #{index => Idx, term => 1}. @@ -1900,7 +1923,7 @@ check_auto(Cid, Idx, State) -> check(Cid, Idx, Num, State) -> strip_reply( apply(meta(Idx), - make_checkout(Cid, {once, Num, simple_prefetch}, #{}), + make_checkout(Cid, {auto, Num, simple_prefetch}, #{}), [], State)). settle(Cid, Idx, MsgId, State) -> diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 5e5ad105e4..04a82c0e5e 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -52,10 +52,12 @@ {rabbit_fifo:consumer_tag(), non_neg_integer()}}. -type actions() :: [action()]. +-type cluster_name() :: rabbit_types:r(queue). + -record(consumer, {last_msg_id :: seq() | -1, delivery_count = 0 :: non_neg_integer()}). --record(state, {cluster_name :: ra_cluster_name(), +-record(state, {cluster_name :: cluster_name(), servers = [] :: [ra_server_id()], leader :: maybe(ra_server_id()), next_seq = 0 :: seq(), @@ -88,7 +90,7 @@ %% @param ClusterName the id of the cluster to interact with %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. --spec init(ra_cluster_name(), [ra_server_id()]) -> state(). +-spec init(cluster_name(), [ra_server_id()]) -> state(). init(ClusterName, Servers) -> init(ClusterName, Servers, ?SOFT_LIMIT). @@ -98,7 +100,7 @@ init(ClusterName, Servers) -> %% @param Servers The known servers of the queue. If the current leader is known %% ensure the leader node is at the head of the list. %% @param MaxPending size defining the max number of pending commands. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). +-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, #state{cluster_name = ClusterName, @@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) -> soft_limit = SoftLimit, timeout = Timeout}. --spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), +-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok), fun(() -> ok)) -> state(). init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) -> Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, @@ -397,12 +399,12 @@ purge(Node) -> end. %% @doc returns the cluster name --spec cluster_name(state()) -> ra_cluster_name(). +-spec cluster_name(state()) -> cluster_name(). cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. update_machine_state(Node, Conf) -> - case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of + case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of {ok, ok, _} -> ok; Err -> diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl index 347f7f205e..f567561f31 100644 --- a/src/rabbit_quorum_memory_manager.erl +++ b/src/rabbit_quorum_memory_manager.erl @@ -15,7 +15,7 @@ %% -module(rabbit_quorum_memory_manager). --include("rabbit.hrl"). +-include_lib("rabbit_common/include/rabbit.hrl"). -export([init/1, handle_call/2, handle_event/2, handle_info/2, terminate/2, code_change/3]). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index 963ef7df01..53dd2e2a7d 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -54,10 +54,6 @@ {'ok', rabbit_fifo_client:state()}. -spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. --spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(), - rabbit_fifo_client:state()) -> - {'ok', 'empty', rabbit_fifo_client:state()} | - {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. -spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) -> {'ok', rabbit_fifo_client:state()}. -spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'. @@ -149,9 +145,11 @@ declare(#amqqueue{name = QName, ra_machine(Q) -> {module, rabbit_fifo, ra_machine_config(Q)}. -ra_machine_config(Q = #amqqueue{name = QName}) -> - #{dead_letter_handler => dlx_mfa(Q), +ra_machine_config(Q = #amqqueue{name = QName, + pid = {Name, _}}) -> + #{name => Name, queue_resource => QName, + dead_letter_handler => dlx_mfa(Q), become_leader_handler => {?MODULE, become_leader, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}) -> @@ -278,7 +276,7 @@ stop(VHost) -> _ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)], ok. --spec delete(rabbit_types:amqqueue(), +-spec delete(#amqqueue{}, boolean(), boolean(), rabbit_types:username()) -> {ok, QLen :: non_neg_integer()}. @@ -333,6 +331,10 @@ reject(false, CTag, MsgIds, QState) -> credit(CTag, Credit, Drain, QState) -> rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState). +-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(), + rabbit_fifo_client:state()) -> + {'ok', 'empty', rabbit_fifo_client:state()} | + {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}. basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, CTag0, QState0) -> CTag = quorum_ctag(CTag0), |
