summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2018-12-17 16:21:59 +0000
committerkjnilsson <knilsson@pivotal.io>2018-12-17 16:21:59 +0000
commit8636e72a0f7500b6b78bc2dbb4ddc92f2d5bd3e5 (patch)
tree00ccab451dc1421012f29ed95761d73a90bb4964 /src
parent2695a45a5d69a59d79e5116b5412059ef7428961 (diff)
downloadrabbitmq-server-git-8636e72a0f7500b6b78bc2dbb4ddc92f2d5bd3e5.tar.gz
Quorum queue: return messages in order
Fixes bug that muddled up the checkout order when a consumer is cancelled with more than 32 messages checked out. Dialyzer fixes. [#162698673]
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl7
-rw-r--r--src/rabbit_fifo.erl57
-rw-r--r--src/rabbit_fifo_client.erl14
-rw-r--r--src/rabbit_quorum_memory_manager.erl2
-rw-r--r--src/rabbit_quorum_queue.erl16
5 files changed, 60 insertions, 36 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f749d9f30e..996774fe35 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -654,6 +654,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
consumer_mapping = ConsumerMapping} = State0) ->
case QueueStates of
#{Name := QState0} ->
+ QName = rabbit_quorum_queue:queue_name(QState0),
case rabbit_quorum_queue:handle_event(Evt, QState0) of
{{delivery, CTag, Msgs}, QState1} ->
AckRequired = case maps:find(CTag, ConsumerMapping) of
@@ -670,7 +671,6 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
true ->
QState1
end,
- QName = rabbit_quorum_queue:queue_name(QState2),
State = lists:foldl(
fun({MsgId, {MsgHeader, Msg}}, Acc) ->
IsDelivered = maps:is_key(delivery_count, MsgHeader),
@@ -702,10 +702,7 @@ handle_info({ra_event, {Name, _} = From, _} = Evt,
%% TODO: this should use dtree:take/3
{MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed),
State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}),
- case maps:find(Name, QNames) of
- {ok, QName} -> erase_queue_stats(QName);
- error -> ok
- end,
+ erase_queue_stats(QName),
noreply_coalesce(
State3#ch{queue_states = maps:remove(Name, QueueStates),
queue_names = maps:remove(Name, QNames)})
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d365d41a96..376ab5e5a2 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -56,7 +56,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
- make_update_state/1
+ make_update_config/1
]).
-type raw_msg() :: term().
@@ -131,7 +131,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
--record(update_state, {config :: config()}).
+-record(update_config, {config :: config()}).
@@ -143,7 +143,7 @@
#discard{} |
#credit{} |
#purge{} |
- #update_state{}.
+ #update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
%% all the command types suppored by ra fifo
@@ -260,10 +260,10 @@
-spec init(config()) -> state().
init(#{name := Name,
queue_resource := Resource} = Conf) ->
- update_state(Conf, #state{name = Name,
+ update_config(Conf, #state{name = Name,
queue_resource = Resource}).
-update_state(Conf, State) ->
+update_config(Conf, State) ->
DLH = maps:get(dead_letter_handler, Conf, undefined),
BLH = maps:get(become_leader_handler, Conf, undefined),
SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL),
@@ -515,8 +515,8 @@ apply(_, {nodeup, Node}, Effects0,
service_queue = SQ}, Monitors ++ Effects);
apply(_, {nodedown, _Node}, Effects, State) ->
{State, Effects, ok};
-apply(_, #update_state{config = Conf}, Effects, State) ->
- {update_state(Conf, State), Effects, ok}.
+apply(_, #update_config{config = Conf}, Effects, State) ->
+ {update_config(Conf, State), Effects, ok}.
-spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects().
state_enter(leader, #state{consumers = Cons,
@@ -900,12 +900,15 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}},
State0#state{messages = maps:put(MsgNum, Msg, Messages),
returns = lqueue:in(MsgNum, Returns)}).
-return_all(State, Checked) ->
- maps:fold(fun (_, '$prefix_msg', S) ->
- return_one(0, '$prefix_msg', S);
- (_, {MsgNum, Msg}, S) ->
- return_one(MsgNum, Msg, S)
- end, State, Checked).
+return_all(State, Checked0) ->
+ %% need to sort the list so that we return messages in the order
+ %% they were checked out
+ Checked = lists:sort(maps:to_list(Checked0)),
+ lists:foldl(fun ({_, '$prefix_msg'}, S) ->
+ return_one(0, '$prefix_msg', S);
+ ({_, {MsgNum, Msg}}, S) ->
+ return_one(MsgNum, Msg, S)
+ end, State, Checked).
checkout(State, Effects) ->
checkout0(checkout_one(State), Effects, #{}).
@@ -1170,9 +1173,9 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
--spec make_update_state(config()) -> protocol().
-make_update_state(Config) ->
- #update_state{config = Config}.
+-spec make_update_config(config()) -> protocol().
+make_update_config(Config) ->
+ #update_config{config = Config}.
add_bytes_enqueue(Msg, #state{msg_bytes_enqueue = Enqueue} = State) ->
Bytes = message_size(Msg),
@@ -1865,6 +1868,26 @@ purge_with_checkout_test() ->
?assertEqual(0, maps:size(Checked)),
ok.
+down_returns_checked_out_in_order_test() ->
+ S0 = test_init(?FUNCTION_NAME),
+ %% enqueue 100
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, 100)),
+ ?assertEqual(100, maps:size(S1#state.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#state.consumers),
+ ?assertEqual(100, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noproc}, [], S2),
+ Returns = lqueue:to_list(S#state.returns),
+ ?assertEqual(100, length(Returns)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
@@ -1900,7 +1923,7 @@ check_auto(Cid, Idx, State) ->
check(Cid, Idx, Num, State) ->
strip_reply(
apply(meta(Idx),
- make_checkout(Cid, {once, Num, simple_prefetch}, #{}),
+ make_checkout(Cid, {auto, Num, simple_prefetch}, #{}),
[], State)).
settle(Cid, Idx, MsgId, State) ->
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 5e5ad105e4..04a82c0e5e 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -52,10 +52,12 @@
{rabbit_fifo:consumer_tag(), non_neg_integer()}}.
-type actions() :: [action()].
+-type cluster_name() :: rabbit_types:r(queue).
+
-record(consumer, {last_msg_id :: seq() | -1,
delivery_count = 0 :: non_neg_integer()}).
--record(state, {cluster_name :: ra_cluster_name(),
+-record(state, {cluster_name :: cluster_name(),
servers = [] :: [ra_server_id()],
leader :: maybe(ra_server_id()),
next_seq = 0 :: seq(),
@@ -88,7 +90,7 @@
%% @param ClusterName the id of the cluster to interact with
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
--spec init(ra_cluster_name(), [ra_server_id()]) -> state().
+-spec init(cluster_name(), [ra_server_id()]) -> state().
init(ClusterName, Servers) ->
init(ClusterName, Servers, ?SOFT_LIMIT).
@@ -98,7 +100,7 @@ init(ClusterName, Servers) ->
%% @param Servers The known servers of the queue. If the current leader is known
%% ensure the leader node is at the head of the list.
%% @param MaxPending size defining the max number of pending commands.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer()) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
#state{cluster_name = ClusterName,
@@ -106,7 +108,7 @@ init(ClusterName = #resource{}, Servers, SoftLimit) ->
soft_limit = SoftLimit,
timeout = Timeout}.
--spec init(ra_cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
+-spec init(cluster_name(), [ra_server_id()], non_neg_integer(), fun(() -> ok),
fun(() -> ok)) -> state().
init(ClusterName = #resource{}, Servers, SoftLimit, BlockFun, UnblockFun) ->
Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000,
@@ -397,12 +399,12 @@ purge(Node) ->
end.
%% @doc returns the cluster name
--spec cluster_name(state()) -> ra_cluster_name().
+-spec cluster_name(state()) -> cluster_name().
cluster_name(#state{cluster_name = ClusterName}) ->
ClusterName.
update_machine_state(Node, Conf) ->
- case ra:process_command(Node, rabbit_fifo:make_update_state(Conf)) of
+ case ra:process_command(Node, rabbit_fifo:make_update_config(Conf)) of
{ok, ok, _} ->
ok;
Err ->
diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl
index 347f7f205e..f567561f31 100644
--- a/src/rabbit_quorum_memory_manager.erl
+++ b/src/rabbit_quorum_memory_manager.erl
@@ -15,7 +15,7 @@
%%
-module(rabbit_quorum_memory_manager).
--include("rabbit.hrl").
+-include_lib("rabbit_common/include/rabbit.hrl").
-export([init/1, handle_call/2, handle_event/2, handle_info/2,
terminate/2, code_change/3]).
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 963ef7df01..53dd2e2a7d 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -54,10 +54,6 @@
{'ok', rabbit_fifo_client:state()}.
-spec reject(Confirm :: boolean(), rabbit_types:ctag(), [msg_id()], rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
--spec basic_get(rabbit_types:amqqueue(), NoAck :: boolean(), rabbit_types:ctag(),
- rabbit_fifo_client:state()) ->
- {'ok', 'empty', rabbit_fifo_client:state()} |
- {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
-spec stateless_deliver(ra_server_id(), rabbit_types:delivery()) -> 'ok'.
@@ -149,9 +145,11 @@ declare(#amqqueue{name = QName,
ra_machine(Q) ->
{module, rabbit_fifo, ra_machine_config(Q)}.
-ra_machine_config(Q = #amqqueue{name = QName}) ->
- #{dead_letter_handler => dlx_mfa(Q),
+ra_machine_config(Q = #amqqueue{name = QName,
+ pid = {Name, _}}) ->
+ #{name => Name,
queue_resource => QName,
+ dead_letter_handler => dlx_mfa(Q),
become_leader_handler => {?MODULE, become_leader, [QName]}}.
cancel_consumer_handler(QName, {ConsumerTag, ChPid}) ->
@@ -278,7 +276,7 @@ stop(VHost) ->
_ = [ra:stop_server(Pid) || #amqqueue{pid = Pid} <- find_quorum_queues(VHost)],
ok.
--spec delete(rabbit_types:amqqueue(),
+-spec delete(#amqqueue{},
boolean(), boolean(),
rabbit_types:username()) ->
{ok, QLen :: non_neg_integer()}.
@@ -333,6 +331,10 @@ reject(false, CTag, MsgIds, QState) ->
credit(CTag, Credit, Drain, QState) ->
rabbit_fifo_client:credit(quorum_ctag(CTag), Credit, Drain, QState).
+-spec basic_get(#amqqueue{}, NoAck :: boolean(), rabbit_types:ctag(),
+ rabbit_fifo_client:state()) ->
+ {'ok', 'empty', rabbit_fifo_client:state()} |
+ {'ok', QLen :: non_neg_integer(), qmsg(), rabbit_fifo_client:state()}.
basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck,
CTag0, QState0) ->
CTag = quorum_ctag(CTag0),