summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorDiana Corbacho <diana@rabbitmq.com>2019-04-04 10:06:18 +0100
committerDiana Corbacho <diana@rabbitmq.com>2019-04-04 10:06:18 +0100
commit279a35b6aaf9e9b7ce7394500b9641eadfaebd0a (patch)
treef0f675e7a85e01cef62ba5c74f081bbf16452a74
parent19f8dec83bfa899cf7d43e532ca8a52c73213a97 (diff)
parent09ab089066e8148ebe30ce3183958ea039470826 (diff)
downloadrabbitmq-server-git-279a35b6aaf9e9b7ce7394500b9641eadfaebd0a.tar.gz
Merge remote-tracking branch 'origin/master' into in-memory-limits
-rw-r--r--scripts/rabbitmq-server.bat10
-rw-r--r--scripts/rabbitmq-service.bat10
-rw-r--r--src/rabbit_access_control.erl24
-rw-r--r--src/rabbit_auth_backend_internal.erl2
-rw-r--r--src/rabbit_core_ff.erl4
-rw-r--r--src/rabbit_direct.erl11
-rw-r--r--src/rabbit_disk_monitor.erl15
-rw-r--r--src/rabbit_fifo.erl365
-rw-r--r--src/rabbit_mnesia.erl3
-rw-r--r--src/rabbit_networking.erl1
-rw-r--r--src/rabbit_quorum_queue.erl39
-rw-r--r--src/rabbit_reader.erl2
-rw-r--r--src/rabbit_table.erl2
-rw-r--r--test/clustering_management_SUITE.erl39
-rw-r--r--test/quorum_queue_SUITE.erl28
-rw-r--r--test/rabbit_fifo_SUITE.erl189
-rw-r--r--test/rabbit_fifo_int_SUITE.erl2
-rw-r--r--test/rabbit_fifo_prop_SUITE.erl348
18 files changed, 850 insertions, 244 deletions
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 27d0037f7b..d070df7a92 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -48,6 +48,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" (
set RABBITMQ_EBIN_ROOT=!RABBITMQ_HOME!\ebin
+CALL :convert_forward_slashes !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE
CALL :get_noex !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE_NOEX
if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
@@ -59,6 +60,7 @@ if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
)
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE
CALL :get_noex !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE_NOEX
if "!RABBITMQ_CONFIG_FILE!" == "!RABBITMQ_CONFIG_FILE_NOEX!" (
@@ -100,6 +102,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_FILE!" (
)
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE
CALL :get_noex !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE_NOEX
if not "!RABBITMQ_CONFIG_ARG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_ARG_FILE!" (
@@ -279,6 +282,13 @@ EXIT /B 0
set "%~2=%~dpn1"
EXIT /B 0
+rem Convert unix style path separators into windows style path separators
+rem needed for comparing with _NOEX variables
+rem rabbitmq/rabbitmq-server#1962
+:convert_forward_slashes
+set "%~2=%~dpf1"
+EXIT /B 0
+
endlocal
endlocal
endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index f85ff75530..6ac0af5dc9 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -131,6 +131,7 @@ if errorlevel 1 (
set RABBITMQ_EBIN_ROOT=!RABBITMQ_HOME!\ebin
+CALL :convert_forward_slashes !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE
CALL :get_noex !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE_NOEX
if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
@@ -142,6 +143,7 @@ if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
)
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE
CALL :get_noex !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE_NOEX
if "!RABBITMQ_CONFIG_FILE!" == "!RABBITMQ_CONFIG_FILE_NOEX!" (
@@ -183,6 +185,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_FILE!" (
)
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE
CALL :get_noex !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE_NOEX
if not "!RABBITMQ_CONFIG_ARG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_ARG_FILE!" (
@@ -393,6 +396,13 @@ EXIT /B 0
set "%~2=%~dpn1"
EXIT /B 0
+rem Convert unix style path separators into windows style path separators
+rem needed for comparing with _NOEX variables
+rem rabbitmq/rabbitmq-server#1962
+:convert_forward_slashes
+set "%~2=%~dpf1"
+EXIT /B 0
+
endlocal
endlocal
endlocal
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl
index 984ee5371d..954d003991 100644
--- a/src/rabbit_access_control.erl
+++ b/src/rabbit_access_control.erl
@@ -125,20 +125,30 @@ check_user_loopback(Username, SockOrAddr) ->
false -> not_allowed
end.
--spec check_vhost_access
- (rabbit_types:user(), rabbit_types:vhost(),
- rabbit_net:socket() | #authz_socket_info{}) ->
- 'ok' | rabbit_types:channel_exit().
-
+get_authz_data_from({ip, Address}) ->
+ #{peeraddr => Address};
+get_authz_data_from({socket, Sock}) ->
+ {ok, {Address, _Port}} = rabbit_net:peername(Sock),
+ #{peeraddr => Address};
+get_authz_data_from(undefined) ->
+ undefined.
+
+% Note: ip can be either a tuple or, a binary if reverse_dns_lookups
+% is enabled and it's a direct connection.
+-spec check_vhost_access(User :: rabbit_types:user(),
+ VHostPath :: rabbit_types:vhost(),
+ AuthzRawData :: {socket, rabbit_net:socket()} | {ip, inet:ip_address() | binary()} | undefined) ->
+ 'ok' | rabbit_types:channel_exit().
check_vhost_access(User = #user{username = Username,
- authz_backends = Modules}, VHostPath, Sock) ->
+ authz_backends = Modules}, VHostPath, AuthzRawData) ->
+ AuthzData = get_authz_data_from(AuthzRawData),
lists:foldl(
fun({Mod, Impl}, ok) ->
check_access(
fun() ->
rabbit_vhost:exists(VHostPath) andalso
Mod:check_vhost_access(
- auth_user(User, Impl), VHostPath, Sock)
+ auth_user(User, Impl), VHostPath, AuthzData)
end,
Mod, "access to vhost '~s' refused for user '~s'",
[VHostPath, Username], not_allowed);
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 417f0d6374..2f8e85f0f3 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -113,7 +113,7 @@ internal_check_user_login(Username, Fun) ->
Refused
end.
-check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) ->
+check_vhost_access(#auth_user{username = Username}, VHostPath, _AuthzData) ->
case mnesia:dirty_read({rabbit_user_permission,
#user_vhost{username = Username,
virtual_host = VHostPath}}) of
diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl
index 461a43e5d9..24d2163f16 100644
--- a/src/rabbit_core_ff.erl
+++ b/src/rabbit_core_ff.erl
@@ -44,12 +44,12 @@
quorum_queue_migration(FeatureName, _FeatureProps, enable) ->
Tables = ?quorum_queue_tables,
- rabbit_table:wait(Tables),
+ rabbit_table:wait(Tables, _Retry = true),
Fields = amqqueue:fields(amqqueue_v2),
migrate_to_amqqueue_with_type(FeatureName, Tables, Fields);
quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) ->
Tables = ?quorum_queue_tables,
- rabbit_table:wait(Tables),
+ rabbit_table:wait(Tables, _Retry = true),
Fields = amqqueue:fields(amqqueue_v2),
mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso
mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields.
diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl
index 696b25f5e4..6a3cafbc28 100644
--- a/src/rabbit_direct.erl
+++ b/src/rabbit_direct.erl
@@ -181,14 +181,11 @@ notify_auth_result(Username, AuthResult, ExtraProps) ->
ExtraProps,
rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']).
-authz_socket_info_direct(Infos) ->
- #authz_socket_info{sockname={proplists:get_value(host, Infos),
- proplists:get_value(port, Infos)},
- peername={proplists:get_value(peer_host, Infos),
- proplists:get_value(peer_port, Infos)}}.
-
connect1(User, VHost, Protocol, Pid, Infos) ->
- try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of
+ % Note: peer_host can be either a tuple or
+ % a binary if reverse_dns_lookups is enabled
+ PeerHost = proplists:get_value(peer_host, Infos),
+ try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}) of
ok -> ok = pg_local:join(rabbit_direct, Pid),
rabbit_core_metrics:connection_created(Pid, Infos),
rabbit_event:notify(connection_created, Infos),
diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl
index 2441587bce..4e5e3d2a28 100644
--- a/src/rabbit_disk_monitor.erl
+++ b/src/rabbit_disk_monitor.erl
@@ -39,7 +39,7 @@
-export([get_disk_free_limit/0, set_disk_free_limit/1,
get_min_check_interval/0, set_min_check_interval/1,
get_max_check_interval/0, set_max_check_interval/1,
- get_disk_free/0]).
+ get_disk_free/0, set_enabled/1]).
-define(SERVER, ?MODULE).
-define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100).
@@ -112,10 +112,14 @@ set_max_check_interval(Interval) ->
gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity).
-spec get_disk_free() -> (integer() | 'unknown').
+-spec set_enabled(string()) -> 'ok'.
get_disk_free() ->
gen_server:call(?MODULE, get_disk_free, infinity).
+set_enabled(Enabled) ->
+ gen_server:call(?MODULE, {set_enabled, Enabled}, infinity).
+
%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
@@ -165,6 +169,15 @@ handle_call({set_max_check_interval, MaxInterval}, _From, State) ->
handle_call(get_disk_free, _From, State = #state { actual = Actual }) ->
{reply, Actual, State};
+handle_call({set_enabled, _Enabled = true}, _From, State) ->
+ start_timer(set_disk_limits(State, State#state.limit)),
+ rabbit_log:info("Free disk space monitor was enabled"),
+ {reply, ok, State#state{enabled = true}};
+handle_call({set_enabled, _Enabled = false}, _From, State) ->
+ erlang:cancel_timer(State#state.timer),
+ rabbit_log:info("Free disk space monitor was manually disabled"),
+ {reply, ok, State#state{enabled = false}};
+
handle_call(_Request, _From, State) ->
{noreply, State}.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index d397459d4f..129dab77ff 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -63,6 +63,7 @@
make_discard/2,
make_credit/4,
make_purge/0,
+ make_purge_nodes/1,
make_update_config/1
]).
@@ -84,6 +85,7 @@
delivery_count :: non_neg_integer(),
drain :: boolean()}).
-record(purge, {}).
+-record(purge_nodes, {nodes :: [node()]}).
-record(update_config, {config :: config()}).
-opaque protocol() ::
@@ -94,6 +96,7 @@
#discard{} |
#credit{} |
#purge{} |
+ #purge_nodes{} |
#update_config{}.
-type command() :: protocol() | ra_machine:builtin_command().
@@ -189,11 +192,9 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId},
apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId},
#?MODULE{consumers = Cons0} = State) ->
case Cons0 of
- #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} ->
- Checked = maps:without(MsgIds, Checked0),
+ #{ConsumerId := #consumer{checked_out = Checked0}} ->
Returned = maps:with(MsgIds, Checked0),
- MsgNumMsgs = maps:values(Returned),
- return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State);
+ return(Meta, ConsumerId, Returned, [], State);
_ ->
{State, ok}
end;
@@ -322,7 +323,7 @@ apply(#{index := RaftIdx}, #purge{},
%% reverse the effects ourselves
{State, {purge, Total},
lists:reverse([garbage_collection | Effects])};
-apply(_, {down, Pid, noconnection},
+apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
cfg = #cfg{consumer_strategy = single_active},
waiting_consumers = Waiting0,
@@ -330,17 +331,30 @@ apply(_, {down, Pid, noconnection},
Node = node(Pid),
%% if the pid refers to the active consumer, mark it as suspected and return
%% it to the waiting queue
- {State1, Effects0} = case maps:to_list(Cons0) of
- [{{_, P} = Cid, C}] when node(P) =:= Node ->
- %% the consumer should be returned to waiting
- %%
- Effs = consumer_update_active_effects(
- State0, Cid, C, false, suspected_down, []),
- {State0#?MODULE{consumers = #{},
- waiting_consumers = Waiting0 ++ [{Cid, C}]},
- Effs};
- _ -> {State0, []}
- end,
+ {State1, Effects0} =
+ case maps:to_list(Cons0) of
+ [{{_, P} = Cid, C0}] when node(P) =:= Node ->
+ %% the consumer should be returned to waiting
+ %% and checked out messages should be returned
+ Effs = consumer_update_active_effects(
+ State0, Cid, C0, false, suspected_down, []),
+ Checked = C0#consumer.checked_out,
+ Credit = increase_credit(C0, maps:size(Checked)),
+ {St, Effs1} = return_all(State0, Effs,
+ Cid, C0#consumer{credit = Credit}),
+ %% if the consumer was cancelled there is a chance it got
+ %% removed when returning hence we need to be defensive here
+ Waiting = case St#?MODULE.consumers of
+ #{Cid := C} ->
+ Waiting0 ++ [{Cid, C}];
+ _ ->
+ Waiting0
+ end,
+ {St#?MODULE{consumers = #{},
+ waiting_consumers = Waiting},
+ Effs1};
+ _ -> {State0, []}
+ end,
WaitingConsumers = update_waiting_consumer_status(Node, State1,
suspected_down),
@@ -354,8 +368,8 @@ apply(_, {down, Pid, noconnection},
(_, E) -> E
end, Enqs0),
Effects = [{monitor, node, Node} | Effects1],
- {State#?MODULE{enqueuers = Enqs}, ok, Effects};
-apply(_, {down, Pid, noconnection},
+ checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, noconnection},
#?MODULE{consumers = Cons0,
enqueuers = Enqs0} = State0) ->
%% A node has been disconnected. This doesn't necessarily mean that
@@ -367,23 +381,22 @@ apply(_, {down, Pid, noconnection},
%% all pids for the disconnected node will be marked as suspected not just
%% the one we got the `down' command for
Node = node(Pid),
- ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0),
- {Cons, State, Effects1} =
- maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0,
- status = up} = C,
- {Co, St0, Eff}) when node(P) =:= Node ->
- {St, Eff0} = return_all(St0, Checked0, Eff, K, C),
- Credit = increase_credit(C, maps:size(Checked0)),
- Eff1 = ConsumerUpdateActiveFun(St, K, C, false,
- suspected_down, Eff0),
- {maps:put(K, C#consumer{status = suspected_down,
- credit = Credit,
- checked_out = #{}}, Co),
- St, Eff1};
- (K, C, {Co, St, Eff}) ->
- {maps:put(K, C, Co), St, Eff}
- end, {#{}, State0, []}, Cons0),
+ {State, Effects1} =
+ maps:fold(
+ fun({_, P} = Cid, #consumer{checked_out = Checked0,
+ status = up} = C0,
+ {St0, Eff}) when node(P) =:= Node ->
+ Credit = increase_credit(C0, map_size(Checked0)),
+ C = C0#consumer{status = suspected_down,
+ credit = Credit},
+ {St, Eff0} = return_all(St0, Eff, Cid, C),
+ Eff1 = consumer_update_active_effects(St, Cid, C, false,
+ suspected_down, Eff0),
+ {St, Eff1};
+ (_, _, {St, Eff}) ->
+ {St, Eff}
+ end, {State0, []}, Cons0),
Enqs = maps:map(fun(P, E) when node(P) =:= Node ->
E#enqueuer{status = suspected_down};
(_, E) -> E
@@ -392,34 +405,15 @@ apply(_, {down, Pid, noconnection},
% Monitor the node so that we can "unsuspect" these processes when the node
% comes back, then re-issue all monitors and discover the final fate of
% these processes
- Effects2 = case maps:size(Cons) of
- 0 ->
- [{aux, inactive}, {monitor, node, Node}];
- _ ->
- [{monitor, node, Node}]
- end ++ Effects1,
- %% TODO: should we run a checkout here?
- {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2};
-apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0,
- enqueuers = Enqs0} = State0) ->
- % Remove any enqueuer for the same pid and enqueue any pending messages
- % This should be ok as we won't see any more enqueues from this pid
- State1 = case maps:take(Pid, Enqs0) of
- {#enqueuer{pending = Pend}, Enqs} ->
- lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
- enqueue(RIdx, RawMsg, S)
- end, State0#?MODULE{enqueuers = Enqs}, Pend);
- error ->
- State0
- end,
- {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
- % return checked out messages to main queue
- % Find the consumers for the down pid
- DownConsumers = maps:keys(
- maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
- {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) ->
- cancel_consumer(ConsumerId, S, E, down)
- end, {State2, Effects1}, DownConsumers),
+ Effects = case maps:size(State#?MODULE.consumers) of
+ 0 ->
+ [{aux, inactive}, {monitor, node, Node}];
+ _ ->
+ [{monitor, node, Node}]
+ end ++ Effects1,
+ checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects);
+apply(Meta, {down, Pid, _Info}, State0) ->
+ {State, Effects} = handle_down(Pid, State0),
checkout(Meta, State, Effects);
apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
@@ -450,16 +444,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0,
Acc
end, {Cons0, SQ0, Monitors}, Cons0),
Waiting = update_waiting_consumer_status(Node, State0, up),
- State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1,
+ State1 = State0#?MODULE{consumers = Cons1,
+ enqueuers = Enqs1,
service_queue = SQ,
waiting_consumers = Waiting},
{State, Effects} = activate_next_consumer(State1, Effects1),
checkout(Meta, State, Effects);
apply(_, {nodedown, _Node}, State) ->
{State, ok};
+apply(_, #purge_nodes{nodes = Nodes}, State0) ->
+ {State, Effects} = lists:foldl(fun(Node, {S, E}) ->
+ purge_node(Node, S, E)
+ end, {State0, []}, Nodes),
+ {State, ok, Effects};
apply(Meta, #update_config{config = Conf}, State) ->
checkout(Meta, update_config(Conf, State), []).
+purge_node(Node, State, Effects) ->
+ lists:foldl(fun(Pid, {S0, E0}) ->
+ {S, E} = handle_down(Pid, S0),
+ {S, E0 ++ E}
+ end, {State, Effects}, all_pids_for(Node, State)).
+
+%% any downs that re not noconnection
+handle_down(Pid, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0} = State0) ->
+ % Remove any enqueuer for the same pid and enqueue any pending messages
+ % This should be ok as we won't see any more enqueues from this pid
+ State1 = case maps:take(Pid, Enqs0) of
+ {#enqueuer{pending = Pend}, Enqs} ->
+ lists:foldl(fun ({_, RIdx, RawMsg}, S) ->
+ enqueue(RIdx, RawMsg, S)
+ end, State0#?MODULE{enqueuers = Enqs}, Pend);
+ error ->
+ State0
+ end,
+ {Effects1, State2} = handle_waiting_consumer_down(Pid, State1),
+ % return checked out messages to main queue
+ % Find the consumers for the down pid
+ DownConsumers = maps:keys(
+ maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)),
+ lists:foldl(fun(ConsumerId, {S, E}) ->
+ cancel_consumer(ConsumerId, S, E, down)
+ end, {State2, Effects1}, DownConsumers).
+
consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) ->
fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) ->
consumer_update_active_effects(State, ConsumerId, Consumer, Active,
@@ -558,8 +586,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name,
query_consumer_count(State), % Consumers
EnqueueBytes,
CheckoutBytes},
+ %% TODO: call a handler that works out if any known nodes need to be
+ %% purged and emit a command effect to append this to the log
[{mod_call, rabbit_quorum_queue,
- handle_tick, [QName, Metrics]}, {aux, emit}].
+ handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}].
-spec overview(state()) -> map().
overview(#?MODULE{consumers = Cons,
@@ -811,9 +841,9 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}},
| Effects].
cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
- case maps:take(ConsumerId, C0) of
- {Consumer, Cons1} ->
- {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0,
+ case C0 of
+ #{ConsumerId := Consumer} ->
+ {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0,
Effects0, Reason),
%% The effects are emitted before the consumer is actually removed
%% if the consumer has unacked messages. This is a bit weird but
@@ -826,7 +856,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) ->
_ ->
{S, Effects}
end;
- error ->
+ _ ->
%% already removed: do nothing
{S0, Effects0}
end.
@@ -863,9 +893,9 @@ activate_next_consumer(#?MODULE{consumers = Cons,
-maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
- Cons1, #?MODULE{consumers = C0,
- service_queue = SQ0} = S0,
+maybe_return_all(ConsumerId, Consumer,
+ #?MODULE{consumers = C0,
+ service_queue = SQ0} = S0,
Effects0, Reason) ->
case Reason of
consumer_cancel ->
@@ -875,11 +905,12 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer,
credit = 0,
status = cancelled},
C0, SQ0, Effects0),
- {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1};
+ {S0#?MODULE{consumers = Cons,
+ service_queue = SQ}, Effects1};
down ->
- {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId,
- Consumer),
- {S1#?MODULE{consumers = Cons1}, Effects1}
+ {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer),
+ {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)},
+ Effects1}
end.
apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) ->
@@ -1009,41 +1040,46 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0,
snd(T) ->
element(2, T).
-return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked,
- Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) ->
- Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, length(MsgNumMsgs))},
- {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0,
- SQ0, Effects0),
- {State1, Effects2} = lists:foldl(
- fun({'$prefix_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0,
- ConsumerId, Con);
- ({'$empty_msg', _} = Msg, {S0, E0}) ->
- return_one(0, Msg, S0, E0,
- ConsumerId, Con);
- ({MsgNum, Msg}, {S0, E0}) ->
- return_one(MsgNum, Msg, S0, E0,
- ConsumerId, Con)
- end, {State0, Effects1}, MsgNumMsgs),
- checkout(Meta, State1#?MODULE{consumers = Cons,
- service_queue = SQ},
- Effects2).
+return(Meta, ConsumerId, Returned,
+ Effects0, #?MODULE{service_queue = SQ0} = State0) ->
+ {State1, Effects1} = maps:fold(
+ fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg';
+ Tag == '$empty_msg'->
+ return_one(MsgId, 0, Msg, S0, E0, ConsumerId);
+ (MsgId, {MsgNum, Msg}, {S0, E0}) ->
+ return_one(MsgId, MsgNum, Msg, S0, E0,
+ ConsumerId)
+ end, {State0, Effects0}, Returned),
+ #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers,
+ Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))},
+ {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0,
+ SQ0, Effects1),
+ State = State1#?MODULE{consumers = Cons,
+ service_queue = SQ},
+ checkout(Meta, State, Effects2).
% used to processes messages that are finished
-complete(ConsumerId, MsgRaftIdxs, NumDiscarded,
- Con0, Checked, Effects0,
+complete(ConsumerId, Discarded,
+ #consumer{checked_out = Checked} = Con0, Effects0,
#?MODULE{consumers = Cons0, service_queue = SQ0,
ra_indexes = Indexes0} = State0) ->
+ MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
%% credit_mode = simple_prefetch should automatically top-up credit
%% as messages are simple_prefetch or otherwise returned
- Con = Con0#consumer{checked_out = Checked,
- credit = increase_credit(Con0, NumDiscarded)},
+ Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked),
+ credit = increase_credit(Con0, maps:size(Discarded))},
{Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0,
SQ0, Effects0),
Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0,
MsgRaftIdxs),
- {State0#?MODULE{consumers = Cons,
+ State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$prefix_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc);
+ ({'$empty_msg', Header}, Acc) ->
+ add_bytes_settle(Header, Acc)
+ end, State0, maps:values(Discarded)),
+ {State1#?MODULE{consumers = Cons,
ra_indexes = Indexes,
service_queue = SQ}, Effects}.
@@ -1062,21 +1098,9 @@ increase_credit(#consumer{credit = Current}, Credit) ->
complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId,
#consumer{checked_out = Checked0} = Con0,
Effects0, State0) ->
- Checked = maps:without(MsgIds, Checked0),
Discarded = maps:with(MsgIds, Checked0),
- MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)],
- State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) ->
- add_bytes_settle(Header, Acc);
- ({'$prefix_msg', Header}, Acc) ->
- add_bytes_settle(Header, Acc);
- ({'$empty_msg', Header}, Acc) ->
- add_bytes_settle(Header, Acc)
- end, State0, maps:values(Discarded)),
- %% need to pass the length of discarded as $prefix_msgs would be filtered
- %% by the above list comprehension
- {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs,
- maps:size(Discarded),
- Con0, Checked, Effects0, State1),
+ {State2, Effects1} = complete(ConsumerId, Discarded, Con0,
+ Effects0, State0),
{State, ok, Effects} = checkout(Meta, State2, Effects1),
% settle metrics are incremented separately
update_smallest_raft_index(IncomingRaftIdx, State, Effects).
@@ -1136,79 +1160,80 @@ find_next_cursor(Smallest, Cursors0, Potential) ->
{Potential, Cursors0}
end.
-return_one(0, {Tag, Header0},
+return_one(MsgId, 0, {Tag, Header0},
#?MODULE{returns = Returns,
+ consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'->
- Header = maps:update_with(delivery_count,
- fun (C) -> C+1 end,
+ Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
Msg = {Tag, Header},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
- Checked = Con#consumer.checked_out,
- {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked,
- Effects0, State0),
- {add_bytes_settle(Header, State1), Effects};
+ complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0);
_ ->
%% this should not affect the release cursor in any way
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
State1 = case Tag of
'$empty_msg' -> State0;
_ -> add_in_memory_counts(maps:get(size, Header), State0)
end,
- {add_bytes_return(Header,
- State1#?MODULE{returns = lqueue:in(Msg, Returns)}),
+ {add_bytes_return(
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in(Msg, Returns)}),
Effects0}
end;
-return_one(MsgNum, {RaftId, {Header0, RawMsg}},
+return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}},
#?MODULE{returns = Returns,
+ consumers = Consumers,
cfg = #cfg{delivery_limit = DeliveryLimit}} = State0,
- Effects0, ConsumerId, Con) ->
- Header = maps:update_with(delivery_count,
- fun (C) -> C+1 end,
+ Effects0, ConsumerId) ->
+ #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers),
+ Header = maps:update_with(delivery_count, fun (C) -> C+1 end,
1, Header0),
Msg = {RaftId, {Header, RawMsg}},
case maps:get(delivery_count, Header) of
DeliveryCount when DeliveryCount > DeliveryLimit ->
DlMsg = {MsgNum, Msg},
- Effects = dead_letter_effects(delivery_limit,
- #{none => DlMsg},
+ Effects = dead_letter_effects(delivery_limit, #{none => DlMsg},
State0, Effects0),
- Checked = Con#consumer.checked_out,
- {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked,
- Effects, State0),
- {add_bytes_settle(Header, State1), Effects1};
+ complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0);
_ ->
+ Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)},
+ %% this should not affect the release cursor in any way
State1 = case RawMsg of
'empty' -> State0;
_ -> add_in_memory_counts(maps:get(size, Header), State0)
end,
- %% this should not affect the release cursor in any way
- {add_bytes_return(Header,
- State1#?MODULE{returns =
- lqueue:in({MsgNum, Msg}, Returns)}),
+ {add_bytes_return(
+ Header,
+ State1#?MODULE{consumers = Consumers#{ConsumerId => Con},
+ returns = lqueue:in({MsgNum, Msg}, Returns)}),
Effects0}
end.
-return_all(State0, Checked0, Effects0, ConsumerId, Consumer) ->
+return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId,
+ #consumer{checked_out = Checked0} = Con) ->
%% need to sort the list so that we return messages in the order
%% they were checked out
Checked = lists:sort(maps:to_list(Checked0)),
- lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) ->
- return_one(0, Msg, S, E, ConsumerId, Consumer);
- ({_, {'$empty_msg', _} = Msg}, {S, E}) ->
- return_one(0, Msg, S, E, ConsumerId, Consumer);
- ({_, {MsgNum, Msg}}, {S, E}) ->
- return_one(MsgNum, Msg, S, E, ConsumerId, Consumer)
- end, {State0, Effects0}, Checked).
+ State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}},
+ lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) ->
+ return_one(MsgId, 0, Msg, S, E, ConsumerId);
+ ({MsgId, {MsgNum, Msg}}, {S, E}) ->
+ return_one(MsgId, MsgNum, Msg, S, E, ConsumerId)
+ end, {State, Effects0}, Checked).
%% checkout new messages to consumers
%% reverses the effects list
checkout(#{index := Index}, State0, Effects0) ->
{State1, _Result, Effects1} = checkout0(checkout_one(State0),
Effects0, {#{}, #{}}),
- case evaluate_limit(State0#?MODULE.ra_indexes, false,
- State1, Effects1) of
+ case evaluate_limit(false, State1, Effects1) of
{State, true, Effects} ->
update_smallest_raft_index(Index, State, Effects);
{State, false, Effects} ->
@@ -1238,17 +1263,16 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) ->
end,
{State0, ok, lists:reverse(Effects1)}.
-evaluate_limit(_OldIndexes, Result,
+evaluate_limit(Result,
#?MODULE{cfg = #cfg{max_length = undefined,
max_bytes = undefined}} = State,
Effects) ->
{State, Result, Effects};
-evaluate_limit(OldIndexes, Result,
- State0, Effects0) ->
+evaluate_limit(Result, State0, Effects0) ->
case is_over_limit(State0) of
true ->
{State, Effects} = drop_head(State0, Effects0),
- evaluate_limit(OldIndexes, true, State, Effects);
+ evaluate_limit(true, State, Effects);
false ->
{State0, Result, Effects0}
end.
@@ -1596,6 +1620,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) ->
-spec make_purge() -> protocol().
make_purge() -> #purge{}.
+-spec make_purge_nodes([node()]) -> protocol().
+make_purge_nodes(Nodes) ->
+ #purge_nodes{nodes = Nodes}.
+
-spec make_update_config(config()) -> protocol().
make_update_config(Config) ->
#update_config{config = Config}.
@@ -1643,6 +1671,39 @@ message_size(Msg) ->
%% probably only hit this for testing so ok to use erts_debug
erts_debug:size(Msg).
+all_nodes(#?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Nodes0 = maps:fold(fun({_, P}, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, #{}, Cons0),
+ Nodes1 = maps:fold(fun(P, _, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes0, Enqs0),
+ maps:keys(
+ lists:foldl(fun({{_, P}, _}, Acc) ->
+ Acc#{node(P) => ok}
+ end, Nodes1, WaitingConsumers0)).
+
+all_pids_for(Node, #?MODULE{consumers = Cons0,
+ enqueuers = Enqs0,
+ waiting_consumers = WaitingConsumers0}) ->
+ Cons = maps:fold(fun({_, P}, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, [], Cons0),
+ Enqs = maps:fold(fun(P, _, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, _, Acc) -> Acc
+ end, Cons, Enqs0),
+ lists:foldl(fun({{_, P}, _}, Acc)
+ when node(P) =:= Node ->
+ [P | Acc];
+ (_, Acc) -> Acc
+ end, Enqs, WaitingConsumers0).
+
suspected_pids_for(Node, #?MODULE{consumers = Cons0,
enqueuers = Enqs0,
waiting_consumers = WaitingConsumers0}) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 65383b80af..162badc33d 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -945,10 +945,13 @@ with_running_or_clean_mnesia(Fun) ->
false ->
SavedMnesiaDir = dir(),
application:unset_env(mnesia, dir),
+ SchemaLoc = application:get_env(mnesia, schema_location, opt_disc),
+ application:set_env(mnesia, schema_location, ram),
mnesia:start(),
Result = Fun(),
application:stop(mnesia),
application:set_env(mnesia, dir, SavedMnesiaDir),
+ application:set_env(mnesia, schema_location, SchemaLoc),
Result
end.
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 9abbd7cfff..45522b4fa3 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -389,6 +389,7 @@ force_connection_event_refresh(Ref) ->
[rabbit_reader:force_event_refresh(C, Ref) || C <- connections()],
ok.
+-spec failed_to_recv_proxy_header(_, _) -> no_return().
failed_to_recv_proxy_header(Ref, Error) ->
Msg = case Error of
closed -> "error when receiving proxy header: TCP socket was ~p prematurely";
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index b11a40cead..8258584e3c 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -28,7 +28,7 @@
-export([cluster_state/1, status/2]).
-export([update_consumer_handler/8, update_consumer/9]).
-export([cancel_consumer_handler/2, cancel_consumer/3]).
--export([become_leader/2, handle_tick/2]).
+-export([become_leader/2, handle_tick/3]).
-export([rpc_delete_metrics/1]).
-export([format/1]).
-export([open_files/1]).
@@ -249,7 +249,9 @@ rpc_delete_metrics(QName) ->
ets:delete(queue_metrics, QName),
ok.
-handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
+handle_tick(QName,
+ {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack},
+ Nodes) ->
%% this makes calls to remote processes so cannot be run inside the
%% ra server
Self = self(),
@@ -272,7 +274,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) ->
{messages_ready, MR},
{messages_unacknowledged, MU},
{reductions, R}]),
- ok = repair_leader_record(QName, Self)
+ ok = repair_leader_record(QName, Self),
+ ExpectedNodes = rabbit_mnesia:cluster_nodes(all),
+ case Nodes -- ExpectedNodes of
+ [] ->
+ ok;
+ Stale ->
+ rabbit_log:info("~s: stale nodes detected. Purging ~w~n",
+ [rabbit_misc:rs(QName), Stale]),
+ %% pipeline purge command
+ {ok, Q} = rabbit_amqqueue:lookup(QName),
+ ok = ra:pipeline_command(amqqueue:get_pid(Q),
+ rabbit_fifo:make_purge_nodes(Stale)),
+
+ ok
+ end
end),
ok.
@@ -663,7 +679,8 @@ add_member(VHost, Name, Node) ->
true ->
case lists:member(Node, QNodes) of
true ->
- {error, already_a_member};
+ %% idempotent by design
+ ok;
false ->
add_member(Q, Node)
end
@@ -709,16 +726,12 @@ delete_member(VHost, Name, Node) ->
{error, classic_queue_not_supported};
{ok, Q} when ?amqqueue_is_quorum(Q) ->
QNodes = amqqueue:get_quorum_nodes(Q),
- case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of
+ case lists:member(Node, QNodes) of
false ->
- {error, node_not_running};
+ %% idempotent by design
+ ok;
true ->
- case lists:member(Node, QNodes) of
- false ->
- {error, not_a_member};
- true ->
- delete_member(Q, Node)
- end
+ delete_member(Q, Node)
end;
{error, not_found} = E ->
E
@@ -733,7 +746,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) ->
%% deleting the last member is not allowed
{error, last_node};
_ ->
- case ra:leave_and_delete_server(ServerId) of
+ case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of
ok ->
Fun = fun(Q1) ->
amqqueue:set_quorum_nodes(
diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl
index 4aea1495a4..aa26cf5482 100644
--- a/src/rabbit_reader.erl
+++ b/src/rabbit_reader.erl
@@ -1229,7 +1229,7 @@ handle_method0(#'connection.open'{virtual_host = VHost},
throttle = Throttle}) ->
ok = is_over_connection_limit(VHost, User),
- ok = rabbit_access_control:check_vhost_access(User, VHost, Sock),
+ ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}),
ok = is_vhost_alive(VHost, User),
NewConnection = Connection#connection{vhost = VHost},
ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol),
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 60e9c9dac2..7dfc5fb573 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -16,7 +16,7 @@
-module(rabbit_table).
--export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1,
+-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, wait/2,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0,
wait_for_replicated/0]).
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
index c9f1565c51..40317ec604 100644
--- a/test/clustering_management_SUITE.erl
+++ b/test/clustering_management_SUITE.erl
@@ -55,7 +55,8 @@ groups() ->
force_boot,
status_with_alarm,
pid_file_and_await_node_startup,
- await_running_count
+ await_running_count,
+ start_with_invalid_schema_in_path
]},
{cluster_size_4, [], [
forget_promotes_offline_slave
@@ -121,6 +122,42 @@ end_per_testcase(Testcase, Config) ->
%% Testcases.
%% -------------------------------------------------------------------
+
+start_with_invalid_schema_in_path(Config) ->
+ [Rabbit, Hare] = cluster_members(Config),
+ stop_app(Rabbit),
+ stop_app(Hare),
+
+ create_bad_schema(Rabbit, Hare, Config),
+
+ start_app(Hare),
+ case start_app(Rabbit) of
+ ok -> ok;
+ ErrRabbit -> error({unable_to_start_with_bad_schema_in_work_dir, ErrRabbit})
+ end.
+
+create_bad_schema(Rabbit, Hare, Config) ->
+
+ {ok, RabbitMnesiaDir} = rpc:call(Rabbit, application, get_env, [mnesia, dir]),
+ {ok, HareMnesiaDir} = rpc:call(Hare, application, get_env, [mnesia, dir]),
+ %% Make sure we don't use the current dir:
+ PrivDir = ?config(priv_dir, Config),
+ ct:pal("Priv dir ~p~n", [PrivDir]),
+ ok = filelib:ensure_dir(filename:join(PrivDir, "file")),
+
+ ok = rpc:call(Rabbit, file, set_cwd, [PrivDir]),
+ ok = rpc:call(Hare, file, set_cwd, [PrivDir]),
+
+ ok = rpc:call(Rabbit, application, unset_env, [mnesia, dir]),
+ ok = rpc:call(Hare, application, unset_env, [mnesia, dir]),
+ ok = rpc:call(Rabbit, mnesia, create_schema, [[Rabbit, Hare]]),
+ ok = rpc:call(Rabbit, mnesia, start, []),
+ {atomic,ok} = rpc:call(Rabbit, mnesia, create_table,
+ [rabbit_queue, [{ram_copies, [Rabbit, Hare]}]]),
+ stopped = rpc:call(Rabbit, mnesia, stop, []),
+ ok = rpc:call(Rabbit, application, set_env, [mnesia, dir, RabbitMnesiaDir]),
+ ok = rpc:call(Hare, application, set_env, [mnesia, dir, HareMnesiaDir]).
+
join_and_part_cluster(Config) ->
[Rabbit, Hare, Bunny] = cluster_members(Config),
assert_not_clustered(Rabbit),
diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl
index 31d6a4c21b..6b623304e8 100644
--- a/test/quorum_queue_SUITE.erl
+++ b/test/quorum_queue_SUITE.erl
@@ -54,8 +54,9 @@ groups() ->
add_member_not_found,
delete_member_not_running,
delete_member_classic,
- delete_member_not_found,
- delete_member]
+ delete_member_queue_not_found,
+ delete_member,
+ delete_member_not_a_member]
++ all_tests()},
{cluster_size_2, [], memory_tests()},
{cluster_size_3, [], [
@@ -1235,7 +1236,8 @@ add_member_already_a_member(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- ?assertEqual({error, already_a_member},
+ %% idempotent by design
+ ?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, add_member,
[<<"/">>, QQ, Server])).
@@ -1273,7 +1275,8 @@ delete_member_not_running(Config) ->
QQ = ?config(queue_name, Config),
?assertEqual({'queue.declare_ok', QQ, 0, 0},
declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
- ?assertEqual({error, node_not_running},
+ %% it should be possible to delete members that are not online (e.g. decomissioned)
+ ?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, 'rabbit@burrow'])).
@@ -1286,7 +1289,7 @@ delete_member_classic(Config) ->
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, CQ, Server])).
-delete_member_not_found(Config) ->
+delete_member_queue_not_found(Config) ->
[Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
QQ = ?config(queue_name, Config),
?assertEqual({error, not_found},
@@ -1302,12 +1305,23 @@ delete_member(Config) ->
timer:sleep(100),
?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
+ [<<"/">>, QQ, Server])).
+
+delete_member_not_a_member(Config) ->
+ [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Server),
+ QQ = ?config(queue_name, Config),
+ ?assertEqual({'queue.declare_ok', QQ, 0, 0},
+ declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])),
+ timer:sleep(100),
+ ?assertEqual(ok,
+ rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])),
- ?assertEqual({error, not_a_member},
+ %% idempotent by design
+ ?assertEqual(ok,
rpc:call(Server, rabbit_quorum_queue, delete_member,
[<<"/">>, QQ, Server])).
-
cleanup_data_dir(Config) ->
%% This test is slow, but also checks that we handle properly errors when
%% trying to delete a queue in minority. A case clause there had gone
diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl
index cf14a68d9d..310553dc56 100644
--- a/test/rabbit_fifo_SUITE.erl
+++ b/test/rabbit_fifo_SUITE.erl
@@ -283,6 +283,19 @@ duplicate_enqueue_test(_) ->
?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3),
ok.
+return_test(_) ->
+ Cid = {<<"cid">>, self()},
+ Cid2 = {<<"cid2">>, self()},
+ {State0, _} = enq(1, 1, msg, test_init(test)),
+ {State1, _} = check_auto(Cid, 2, State0),
+ {State2, _} = check_auto(Cid2, 3, State1),
+ {State3, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [0]), State2),
+ ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0,
+ State3#rabbit_fifo.consumers),
+ ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1,
+ State3#rabbit_fifo.consumers),
+ ok.
+
return_non_existent_test(_) ->
Cid = {<<"cid">>, self()},
{State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)),
@@ -382,14 +395,18 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) ->
% monitor both enqueuer and consumer
% because we received a noconnection we now need to monitor the node
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
- #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers),
+ #consumer{credit = 1,
+ checked_out = Ch,
+ status = suspected_down} = maps:get(Cid, State2a#rabbit_fifo.consumers),
+ ?assertEqual(#{}, Ch),
%% validate consumer has credit
{State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a),
?ASSERT_EFF({monitor, node, _}, Effects2),
?assertNoEffect({demonitor, process, _}, Effects2),
% when the node comes up we need to retry the process monitors for the
% disconnected processes
- {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2),
+ #consumer{status = up} = maps:get(Cid, State3#rabbit_fifo.consumers),
% try to re-monitor the suspect processes
?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3),
?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3),
@@ -407,6 +424,10 @@ down_with_noconnection_returns_unack_test(_) ->
{State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1),
?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)),
?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)),
+ ?assertMatch(#consumer{checked_out = Ch,
+ status = suspected_down}
+ when map_size(Ch) == 0,
+ maps:get(Cid, State2a#rabbit_fifo.consumers)),
ok.
down_with_noproc_enqueuer_is_cleaned_up_test(_) ->
@@ -426,7 +447,8 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) ->
?ASSERT_EFF({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects1),
- {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1),
+ {_State2, _, Effects2} = apply(meta(1),
+ rabbit_fifo:make_discard(Cid, [0]), State1),
?assertNoEffect({send_msg, _,
{delivery, _, [{0, {#{}, first}}]}, _},
Effects2),
@@ -458,9 +480,12 @@ tick_test(_) ->
{S3, {_, _}} = deq(4, Cid2, unsettled, S2),
{S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3),
- [{mod_call, _, _,
+ [{mod_call, rabbit_quorum_queue, handle_tick,
[#resource{},
- {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4),
+ {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3},
+ [_Node]
+ ]},
+ {aux, emit}] = rabbit_fifo:tick(1, S4),
ok.
@@ -556,7 +581,7 @@ purge_with_checkout_test(_) ->
?assertEqual(1, maps:size(Checked)),
ok.
-down_returns_checked_out_in_order_test(_) ->
+down_noproc_returns_checked_out_in_order_test(_) ->
S0 = test_init(?FUNCTION_NAME),
%% enqueue 100
S1 = lists:foldl(fun (Num, FS0) ->
@@ -572,6 +597,30 @@ down_returns_checked_out_in_order_test(_) ->
{S, _, _} = apply(meta(102), {down, self(), noproc}, S2),
Returns = lqueue:to_list(S#rabbit_fifo.returns),
?assertEqual(100, length(Returns)),
+ ?assertEqual(0, maps:size(S#rabbit_fifo.consumers)),
+ %% validate returns are in order
+ ?assertEqual(lists:sort(Returns), Returns),
+ ok.
+
+down_noconnection_returns_checked_out_test(_) ->
+ S0 = test_init(?FUNCTION_NAME),
+ NumMsgs = 20,
+ S1 = lists:foldl(fun (Num, FS0) ->
+ {FS, _} = enq(Num, Num, Num, FS0),
+ FS
+ end, S0, lists:seq(1, NumMsgs)),
+ ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)),
+ Cid = {<<"cid">>, self()},
+ {S2, _} = check(Cid, 101, 1000, S1),
+ #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers),
+ ?assertEqual(NumMsgs, maps:size(Checked)),
+ %% simulate down
+ {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2),
+ Returns = lqueue:to_list(S#rabbit_fifo.returns),
+ ?assertEqual(NumMsgs, length(Returns)),
+ ?assertMatch(#consumer{checked_out = Ch}
+ when map_size(Ch) == 0,
+ maps:get(Cid, S#rabbit_fifo.consumers)),
%% validate returns are in order
?assertEqual(lists:sort(Returns), Returns),
ok.
@@ -736,6 +785,41 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) ->
ok.
+single_active_returns_messages_on_noconnection_test(_) ->
+ R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => R,
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
+ Meta = #{index => 1},
+ Nodes = [n1],
+ ConsumerIds = [{_, DownPid}] =
+ [begin
+ B = atom_to_binary(N, utf8),
+ {<<"ctag_", B/binary>>,
+ test_util:fake_pid(N)}
+ end || N <- Nodes],
+ % adding some consumers
+ State1 = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
+ {State2, _} = enq(4, 1, msg1, State1),
+ % simulate node goes down
+ {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2),
+ %% assert the consumer is up
+ ?assertMatch([_], lqueue:to_list(State3#rabbit_fifo.returns)),
+ ?assertMatch([{_, #consumer{checked_out = Checked}}]
+ when map_size(Checked) == 0,
+ State3#rabbit_fifo.waiting_consumers),
+
+ ok.
+
single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)),
State0 = init(#{name => ?FUNCTION_NAME,
@@ -751,26 +835,30 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) ->
test_util:fake_pid(N)}
end || N <- Nodes],
% adding some consumers
- State1 = lists:foldl(
- fun(CId, Acc0) ->
- {Acc, _, _} =
- apply(Meta,
- make_checkout(CId,
- {once, 1, simple_prefetch}, #{}),
- Acc0),
- Acc
- end, State0, ConsumerIds),
+ State1a = lists:foldl(
+ fun(CId, Acc0) ->
+ {Acc, _, _} =
+ apply(Meta,
+ make_checkout(CId,
+ {once, 1, simple_prefetch}, #{}),
+ Acc0),
+ Acc
+ end, State0, ConsumerIds),
%% assert the consumer is up
?assertMatch(#{C1 := #consumer{status = up}},
- State1#rabbit_fifo.consumers),
+ State1a#rabbit_fifo.consumers),
+
+ {State1, _} = enq(10, 1, msg, State1a),
% simulate node goes down
{State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1),
%% assert a new consumer is in place and it is up
- ?assertMatch([{C2, #consumer{status = up}}],
- maps:to_list(State2#rabbit_fifo.consumers)),
+ ?assertMatch([{C2, #consumer{status = up,
+ checked_out = Ch}}]
+ when map_size(Ch) == 1,
+ maps:to_list(State2#rabbit_fifo.consumers)),
%% the disconnected consumer has been returned to waiting
?assert(lists:any(fun ({C,_}) -> C =:= C1 end,
@@ -836,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) ->
single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) ->
State0 = init(#{name => ?FUNCTION_NAME,
- queue_resource => rabbit_misc:r("/", queue,
- atom_to_binary(?FUNCTION_NAME, utf8)),
- release_cursor_interval => 0,
- single_active_consumer_on => true}),
+ queue_resource =>
+ rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ release_cursor_interval => 0,
+ single_active_consumer_on => true}),
DummyFunction = fun() -> ok end,
Pid1 = spawn(DummyFunction),
@@ -990,11 +1079,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) ->
State1 = lists:foldl(AddConsumer, State0,
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1),
% 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node
?assertEqual(4 + 1, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID
?assertEqual(4 + 4, length(Effects3)).
@@ -1023,11 +1112,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co
[{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2},
{<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]),
- {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1),
+ {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1),
% one monitor and one consumer status update (deactivated)
?assertEqual(3, length(Effects2)),
- {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2),
+ {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2),
% for each consumer: 1 effect to monitor the consumer PID
?assertEqual(5, length(Effects3)).
@@ -1118,6 +1207,54 @@ single_active_with_credited_test(_) ->
State3#rabbit_fifo.waiting_consumers),
ok.
+purge_nodes_test(_) ->
+ Node = purged@node,
+ ThisNode = node(),
+ EnqPid = test_util:fake_pid(Node),
+ EnqPid2 = test_util:fake_pid(node()),
+ ConPid = test_util:fake_pid(Node),
+ Cid = {<<"tag">>, ConPid},
+ % WaitingPid = test_util:fake_pid(Node),
+
+ State0 = init(#{name => ?FUNCTION_NAME,
+ queue_resource => rabbit_misc:r("/", queue,
+ atom_to_binary(?FUNCTION_NAME, utf8)),
+ single_active_consumer_on => false}),
+ {State1, _, _} = apply(meta(1),
+ rabbit_fifo:make_enqueue(EnqPid, 1, msg1),
+ State0),
+ {State2, _, _} = apply(meta(2),
+ rabbit_fifo:make_enqueue(EnqPid2, 1, msg2),
+ State1),
+ {State3, _} = check(Cid, 3, 1000, State2),
+ {State4, _, _} = apply(meta(4),
+ {down, EnqPid, noconnection},
+ State3),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode, Node]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State4)),
+ %% assert there are both enqueuers and consumers
+ {State, _, _} = apply(meta(5),
+ rabbit_fifo:make_purge_nodes([Node]),
+ State4),
+
+ %% assert there are no enqueuers nor consumers
+ ?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1,
+ State),
+
+ ?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0,
+ State),
+ ?assertMatch(
+ [{mod_call, rabbit_quorum_queue, handle_tick,
+ [#resource{}, _Metrics,
+ [ThisNode]
+ ]},
+ {aux, emit}] , rabbit_fifo:tick(1, State)),
+ ok.
+
meta(Idx) ->
#{index => Idx, term => 1}.
diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl
index f281d15795..d4ae417a78 100644
--- a/test/rabbit_fifo_int_SUITE.erl
+++ b/test/rabbit_fifo_int_SUITE.erl
@@ -54,7 +54,7 @@ end_per_group(_, Config) ->
init_per_testcase(TestCase, Config) ->
meck:new(rabbit_quorum_queue, [passthrough]),
- meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end),
+ meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end),
meck:expect(rabbit_quorum_queue, cancel_consumer_handler,
fun (_, _) -> ok end),
ra_server_sup_sup:remove_all(),
diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl
index 58e6c2452c..d670f2bdd7 100644
--- a/test/rabbit_fifo_prop_SUITE.erl
+++ b/test/rabbit_fifo_prop_SUITE.erl
@@ -8,6 +8,8 @@
-include_lib("proper/include/proper.hrl").
-include_lib("common_test/include/ct.hrl").
-include_lib("eunit/include/eunit.hrl").
+-include_lib("ra/include/ra.hrl").
+-include("src/rabbit_fifo.hrl").
%%%===================================================================
%%% Common Test callbacks
@@ -21,6 +23,7 @@ all() ->
all_tests() ->
[
+ test_run_log,
snapshots,
scenario1,
scenario2,
@@ -41,7 +44,14 @@ all_tests() ->
scenario17,
scenario18,
scenario19,
- scenario20
+ scenario20,
+ single_active,
+ single_active_01,
+ single_active_02,
+ single_active_03,
+ single_active_ordering,
+ single_active_ordering_01
+ % single_active_ordering_02
].
groups() ->
@@ -288,7 +298,9 @@ scenario17(_Config) ->
make_checkout(C2, cancel),
make_enqueue(E,2,<<"two">>),
{nodeup,rabbit@fake_node1},
+ %% this has no effect as was returned
make_settle(C1, [0]),
+ %% this should settle "one"
make_settle(C1, [1])
],
run_snapshot_test(#{name => ?FUNCTION_NAME,
@@ -343,6 +355,87 @@ scenario20(_Config) ->
max_in_memory_length => 1}, Commands),
ok.
+single_active_01(_Config) ->
+ C1Pid = test_util:fake_pid(rabbit@fake_node1),
+ C1 = {<<0>>, C1Pid},
+ C2Pid = test_util:fake_pid(rabbit@fake_node2),
+ C2 = {<<>>, C2Pid},
+ E = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<"one">>),
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_checkout(C1, cancel),
+ {nodeup,rabbit@fake_node1}
+ ],
+ ?assert(
+ single_active_prop(#{name => ?FUNCTION_NAME,
+ single_active_consumer_on => true
+ }, Commands, false)),
+ ok.
+
+single_active_02(_Config) ->
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ C2Pid = test_util:fake_pid(node()),
+ C2 = {<<>>, C2Pid},
+ E = test_util:fake_pid(node()),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E,1,<<"one">>),
+ {down,E,noconnection},
+ make_checkout(C2, {auto,1,simple_prefetch}),
+ make_checkout(C2, cancel),
+ {down,E,noconnection}
+ ],
+ Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1, undefined, undefined),
+ ?assert(single_active_prop(Conf, Commands, false)),
+ ok.
+
+single_active_03(_Config) ->
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ % C2Pid = test_util:fake_pid(rabbit@fake_node2),
+ % C2 = {<<>>, C2Pid},
+ Pid = test_util:fake_pid(node()),
+ E = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E, 1, 0),
+ make_enqueue(E, 2, 1),
+ {down, Pid, noconnection},
+ {nodeup, node()}
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, undefined, undefined),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
+test_run_log(_Config) ->
+ Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
+ run_proper(
+ fun () ->
+ ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength,
+ InMemoryBytes},
+ frequency([{10, {0, 0, false, 0, 0, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ boolean(),
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)),
+ collect({log_size, length(O)},
+ dump_generated(
+ config(?FUNCTION_NAME,
+ Length,
+ Bytes,
+ SingleActiveConsumer,
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O))))
+ end, [], 10).
+
snapshots(_Config) ->
run_proper(
fun () ->
@@ -368,6 +461,90 @@ snapshots(_Config) ->
InMemoryBytes), O))))
end, [], 2500).
+single_active(_Config) ->
+ Size = 2000,
+ run_proper(
+ fun () ->
+ ?FORALL({Length, Bytes, DeliveryLimit, InMemoryLength, InMemoryBytes},
+ frequency([{10, {0, 0, 0, 0, 0}},
+ {5, {oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined]),
+ oneof([range(1, 3), undefined]),
+ oneof([range(1, 10), undefined]),
+ oneof([range(1, 1000), undefined])
+ }}]),
+ ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)),
+ collect({log_size, length(O)},
+ single_active_prop(
+ config(?FUNCTION_NAME,
+ Length,
+ Bytes,
+ true,
+ DeliveryLimit,
+ InMemoryLength,
+ InMemoryBytes), O,
+ false))))
+ end, [], Size).
+
+single_active_ordering(_Config) ->
+ Size = 2000,
+ Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end},
+ run_proper(
+ fun () ->
+ ?FORALL(O, ?LET(Ops, log_gen_ordered(Size), expand(Ops, Fun)),
+ collect({log_size, length(O)},
+ single_active_prop(config(?FUNCTION_NAME,
+ undefined,
+ undefined,
+ true,
+ undefined,
+ undefined,
+ undefined), O,
+ true)))
+ end, [], Size).
+
+single_active_ordering_01(_Config) ->
+% [{enqueue,<0.145.0>,1,0},
+% {enqueue,<0.145.0>,1,1},
+% {checkout,{<<>>,<0.148.0>},{auto,1,simple_prefetch},#{ack => true,args => [],prefetch => 1,username => <<117,115,101,114>>}}
+% {enqueue,<0.140.0>,1,2},
+% {settle,{<<>>,<0.148.0>},[0]}]
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ E = test_util:fake_pid(rabbit@fake_node2),
+ E2 = test_util:fake_pid(rabbit@fake_node2),
+ Commands = [
+ make_enqueue(E, 1, 0),
+ make_enqueue(E, 2, 1),
+ make_checkout(C1, {auto,2,simple_prefetch}),
+ make_enqueue(E2, 1, 2),
+ make_settle(C1, [0])
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
+single_active_ordering_02(_Config) ->
+ %% this results in the pending enqueue being enqueued and violating
+ %% ordering
+% [{checkout, % {<<>>,<0.177.0>}, % {auto,1,simple_prefetch},
+% {enqueue,<0.172.0>,2,1},
+% {down,<0.172.0>,noproc},
+% {settle,{<<>>,<0.177.0>},[0]}]
+ C1Pid = test_util:fake_pid(node()),
+ C1 = {<<0>>, C1Pid},
+ E = test_util:fake_pid(node()),
+ Commands = [
+ make_checkout(C1, {auto,1,simple_prefetch}),
+ make_enqueue(E, 2, 1),
+ %% CANNOT HAPPEN
+ {down,E,noproc},
+ make_settle(C1, [0])
+ ],
+ Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0),
+ ?assert(single_active_prop(Conf, Commands, true)),
+ ok.
+
config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) ->
#{name => Name,
max_length => map_max(Length),
@@ -380,6 +557,65 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemor
map_max(0) -> undefined;
map_max(N) -> N.
+single_active_prop(Conf0, Commands, ValidateOrder) ->
+ Conf = Conf0#{release_cursor_interval => 100},
+ Indexes = lists:seq(1, length(Commands)),
+ Entries = lists:zip(Indexes, Commands),
+ %% invariant: there can only be one active consumer at any one time
+ %% there can however be multiple cancelled consumers
+ Invariant = fun (#rabbit_fifo{consumers = Consumers}) ->
+ Up = maps:filter(fun (_, #consumer{status = S}) ->
+ S == up
+ end, Consumers),
+ map_size(Up) =< 1
+ end,
+ try run_log(test_init(Conf), Entries, Invariant) of
+ {_State, Effects} when ValidateOrder ->
+ %% validate message ordering
+ lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event},
+ Acc) ->
+ validate_msg_order({Tag, Pid}, Msgs, Acc);
+ (_, Acc) ->
+ Acc
+ end, -1, Effects),
+ true;
+ _ ->
+ true
+ catch
+ Err ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ ct:pal("Err: ~p~n", [Err]),
+ false
+ end.
+
+%% single active consumer ordering invariant:
+%% only redelivered messages can go backwards
+validate_msg_order(_, [], S) ->
+ S;
+validate_msg_order(Cid, [{_, {H, Num}} | Rem], PrevMax) ->
+ Redelivered = maps:is_key(delivery_count, H),
+ case undefined of
+ _ when Num == PrevMax + 1 ->
+ %% forwards case
+ validate_msg_order(Cid, Rem, Num);
+ _ when Redelivered andalso Num =< PrevMax ->
+ %% the seq is lower but this is a redelivery
+ %% when the consumer changed and the next messages has been redelivered
+ %% we may go backwards but keep the highest seen
+ validate_msg_order(Cid, Rem, PrevMax);
+ _ ->
+ ct:pal("out of order ~w Prev ~w Curr ~w Redel ~w",
+ [Cid, PrevMax, Num, Redelivered]),
+ throw({outoforder, Cid, PrevMax, Num})
+ end.
+
+
+
+
+dump_generated(Conf, Commands) ->
+ ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]),
+ true.
+
snapshots_prop(Conf, Commands) ->
try run_snapshot_test(Conf, Commands) of
_ -> true
@@ -391,6 +627,9 @@ snapshots_prop(Conf, Commands) ->
end.
log_gen(Size) ->
+ log_gen(Size, binary()).
+
+log_gen(Size, _Body) ->
Nodes = [node(),
fakenode@fake,
fakenode@fake2
@@ -413,6 +652,31 @@ log_gen(Size) ->
{1, purge}
]))))).
+log_gen_ordered(Size) ->
+ Nodes = [node(),
+ fakenode@fake,
+ fakenode@fake2
+ ],
+ ?LET(EPids, vector(1, pid_gen(Nodes)),
+ ?LET(CPids, vector(5, pid_gen(Nodes)),
+ resize(Size,
+ list(
+ frequency(
+ [{20, enqueue_gen(oneof(EPids), 10, 0)},
+ {40, {input_event,
+ frequency([{10, settle},
+ {2, return},
+ {1, discard},
+ {1, requeue}])}},
+ {2, checkout_gen(oneof(CPids))},
+ {1, checkout_cancel_gen(oneof(CPids))},
+ {1, down_gen(oneof(EPids ++ CPids))},
+ {1, nodeup_gen(Nodes)}
+ ]))))).
+
+monotonic_gen() ->
+ ?LET(_, integer(), erlang:unique_integer([positive, monotonic])).
+
pid_gen(Nodes) ->
?LET(Node, oneof(Nodes),
test_util:fake_pid(atom_to_binary(Node, utf8))).
@@ -424,9 +688,12 @@ nodeup_gen(Nodes) ->
{nodeup, oneof(Nodes)}.
enqueue_gen(Pid) ->
+ enqueue_gen(Pid, 10, 1).
+
+enqueue_gen(Pid, Enq, Del) ->
?LET(E, {enqueue, Pid,
- frequency([{10, enqueue},
- {1, delay}]),
+ frequency([{Enq, enqueue},
+ {Del, delay}]),
binary()}, E).
checkout_cancel_gen(Pid) ->
@@ -445,16 +712,21 @@ checkout_gen(Pid) ->
enqueuers = #{} :: #{pid() => term()},
consumers = #{} :: #{{binary(), pid()} => term()},
effects = queue:new() :: queue:queue(),
+ %% to transform the body
+ enq_body_fun = {0, fun ra_lib:id/1},
log = [] :: list(),
down = #{} :: #{pid() => noproc | noconnection}
}).
expand(Ops) ->
+ expand(Ops, {undefined, fun ra_lib:id/1}).
+
+expand(Ops, EnqFun) ->
%% execute each command against a rabbit_fifo state and capture all relevant
%% effects
- T = #t{},
+ T = #t{enq_body_fun = EnqFun},
#t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops),
- %% process the remaining effects
+ %% process the remaining effect
#t{log = Log} = lists:foldl(fun do_apply/2,
T1#t{effects = queue:new()},
queue:to_list(Effs)),
@@ -464,6 +736,7 @@ expand(Ops) ->
handle_op({enqueue, Pid, When, Data},
#t{enqueuers = Enqs0,
+ enq_body_fun = {EnqSt0, Fun},
down = Down,
effects = Effs} = T) ->
case Down of
@@ -474,13 +747,17 @@ handle_op({enqueue, Pid, When, Data},
_ ->
Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0),
MsgSeq = maps:get(Pid, Enqs),
- Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data),
+ {EnqSt, Msg} = Fun({EnqSt0, Data}),
+ Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Msg),
case When of
enqueue ->
- do_apply(Cmd, T#t{enqueuers = Enqs});
+ do_apply(Cmd, T#t{enqueuers = Enqs,
+ enq_body_fun = {EnqSt, Fun}});
delay ->
%% just put the command on the effects queue
- T#t{effects = queue:in(Cmd, Effs)}
+ T#t{effects = queue:in(Cmd, Effs),
+ enqueuers = Enqs,
+ enq_body_fun = {EnqSt, Fun}}
end
end;
handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) ->
@@ -540,8 +817,8 @@ handle_op({input_event, Settlement}, #t{effects = Effs,
discard -> rabbit_fifo:make_discard(CId, MsgIds)
end,
do_apply(Cmd, T#t{effects = Q});
- {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue ->
- case maps:is_key(element(2, Cmd), Down) of
+ {{value, {enqueue, Pid, _, _} = Cmd}, Q} ->
+ case maps:is_key(Pid, Down) of
true ->
%% enqueues cannot arrive after down for the same process
%% drop message
@@ -555,21 +832,30 @@ handle_op({input_event, Settlement}, #t{effects = Effs,
handle_op(purge, T) ->
do_apply(rabbit_fifo:make_purge(), T).
-do_apply(Cmd, #t{effects = Effs, index = Index, state = S0,
+
+do_apply(Cmd, #t{effects = Effs,
+ index = Index, state = S0,
+ down = Down,
log = Log} = T) ->
- {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
- {S, _, E} when is_list(E) ->
- {S, E};
- {S, _, E} ->
- {S, [E]};
- {S, _} ->
- {S, []}
- end,
-
- T#t{state = St,
- index = Index + 1,
- effects = enq_effs(Effects, Effs),
- log = [Cmd | Log]}.
+ case Cmd of
+ {enqueue, Pid, _, _} when is_map_key(Pid, Down) ->
+ %% down
+ T;
+ _ ->
+ {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of
+ {S, _, E} when is_list(E) ->
+ {S, E};
+ {S, _, E} ->
+ {S, [E]};
+ {S, _} ->
+ {S, []}
+ end,
+
+ T#t{state = St,
+ index = Index + 1,
+ effects = enq_effs(Effects, Effs),
+ log = [Cmd | Log]}
+ end.
enq_effs([], Q) -> Q;
enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) ->
@@ -637,13 +923,27 @@ prefixes(Source, N, Acc) ->
prefixes(Source, N+1, [X | Acc]).
run_log(InitState, Entries) ->
+ run_log(InitState, Entries, fun(_) -> true end).
+
+run_log(InitState, Entries, InvariantFun) ->
+ Invariant = fun(E, S) ->
+ case InvariantFun(S) of
+ true -> ok;
+ false ->
+ throw({invariant, E, S})
+ end
+ end,
+
lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) ->
case rabbit_fifo:apply(meta(Idx), E, Acc0) of
{Acc, _, Efx} when is_list(Efx) ->
+ Invariant(E, Acc),
{Acc, Efx0 ++ Efx};
{Acc, _, Efx} ->
+ Invariant(E, Acc),
{Acc, Efx0 ++ [Efx]};
{Acc, _} ->
+ Invariant(E, Acc),
{Acc, Efx0}
end
end, {InitState, []}, Entries).