diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 10:06:18 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 10:06:18 +0100 |
| commit | 279a35b6aaf9e9b7ce7394500b9641eadfaebd0a (patch) | |
| tree | f0f675e7a85e01cef62ba5c74f081bbf16452a74 | |
| parent | 19f8dec83bfa899cf7d43e532ca8a52c73213a97 (diff) | |
| parent | 09ab089066e8148ebe30ce3183958ea039470826 (diff) | |
| download | rabbitmq-server-git-279a35b6aaf9e9b7ce7394500b9641eadfaebd0a.tar.gz | |
Merge remote-tracking branch 'origin/master' into in-memory-limits
| -rw-r--r-- | scripts/rabbitmq-server.bat | 10 | ||||
| -rw-r--r-- | scripts/rabbitmq-service.bat | 10 | ||||
| -rw-r--r-- | src/rabbit_access_control.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_core_ff.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 365 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 2 | ||||
| -rw-r--r-- | test/clustering_management_SUITE.erl | 39 | ||||
| -rw-r--r-- | test/quorum_queue_SUITE.erl | 28 | ||||
| -rw-r--r-- | test/rabbit_fifo_SUITE.erl | 189 | ||||
| -rw-r--r-- | test/rabbit_fifo_int_SUITE.erl | 2 | ||||
| -rw-r--r-- | test/rabbit_fifo_prop_SUITE.erl | 348 |
18 files changed, 850 insertions, 244 deletions
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index 27d0037f7b..d070df7a92 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -48,6 +48,7 @@ if not exist "!ERLANG_HOME!\bin\erl.exe" ( set RABBITMQ_EBIN_ROOT=!RABBITMQ_HOME!\ebin
+CALL :convert_forward_slashes !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE
CALL :get_noex !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE_NOEX
if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
@@ -59,6 +60,7 @@ if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" ( )
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE
CALL :get_noex !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE_NOEX
if "!RABBITMQ_CONFIG_FILE!" == "!RABBITMQ_CONFIG_FILE_NOEX!" (
@@ -100,6 +102,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_FILE!" ( )
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE
CALL :get_noex !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE_NOEX
if not "!RABBITMQ_CONFIG_ARG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_ARG_FILE!" (
@@ -279,6 +282,13 @@ EXIT /B 0 set "%~2=%~dpn1"
EXIT /B 0
+rem Convert unix style path separators into windows style path separators
+rem needed for comparing with _NOEX variables
+rem rabbitmq/rabbitmq-server#1962
+:convert_forward_slashes
+set "%~2=%~dpf1"
+EXIT /B 0
+
endlocal
endlocal
endlocal
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index f85ff75530..6ac0af5dc9 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -131,6 +131,7 @@ if errorlevel 1 ( set RABBITMQ_EBIN_ROOT=!RABBITMQ_HOME!\ebin
+CALL :convert_forward_slashes !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE
CALL :get_noex !RABBITMQ_ADVANCED_CONFIG_FILE! RABBITMQ_ADVANCED_CONFIG_FILE_NOEX
if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" (
@@ -142,6 +143,7 @@ if "!RABBITMQ_ADVANCED_CONFIG_FILE!" == "!RABBITMQ_ADVANCED_CONFIG_FILE_NOEX!" ( )
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE
CALL :get_noex !RABBITMQ_CONFIG_FILE! RABBITMQ_CONFIG_FILE_NOEX
if "!RABBITMQ_CONFIG_FILE!" == "!RABBITMQ_CONFIG_FILE_NOEX!" (
@@ -183,6 +185,7 @@ if "!RABBITMQ_CONFIG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_FILE!" ( )
)
+CALL :convert_forward_slashes !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE
CALL :get_noex !RABBITMQ_CONFIG_ARG_FILE! RABBITMQ_CONFIG_ARG_FILE_NOEX
if not "!RABBITMQ_CONFIG_ARG_FILE_NOEX!.config" == "!RABBITMQ_CONFIG_ARG_FILE!" (
@@ -393,6 +396,13 @@ EXIT /B 0 set "%~2=%~dpn1"
EXIT /B 0
+rem Convert unix style path separators into windows style path separators
+rem needed for comparing with _NOEX variables
+rem rabbitmq/rabbitmq-server#1962
+:convert_forward_slashes
+set "%~2=%~dpf1"
+EXIT /B 0
+
endlocal
endlocal
endlocal
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 984ee5371d..954d003991 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -125,20 +125,30 @@ check_user_loopback(Username, SockOrAddr) -> false -> not_allowed end. --spec check_vhost_access - (rabbit_types:user(), rabbit_types:vhost(), - rabbit_net:socket() | #authz_socket_info{}) -> - 'ok' | rabbit_types:channel_exit(). - +get_authz_data_from({ip, Address}) -> + #{peeraddr => Address}; +get_authz_data_from({socket, Sock}) -> + {ok, {Address, _Port}} = rabbit_net:peername(Sock), + #{peeraddr => Address}; +get_authz_data_from(undefined) -> + undefined. + +% Note: ip can be either a tuple or, a binary if reverse_dns_lookups +% is enabled and it's a direct connection. +-spec check_vhost_access(User :: rabbit_types:user(), + VHostPath :: rabbit_types:vhost(), + AuthzRawData :: {socket, rabbit_net:socket()} | {ip, inet:ip_address() | binary()} | undefined) -> + 'ok' | rabbit_types:channel_exit(). check_vhost_access(User = #user{username = Username, - authz_backends = Modules}, VHostPath, Sock) -> + authz_backends = Modules}, VHostPath, AuthzRawData) -> + AuthzData = get_authz_data_from(AuthzRawData), lists:foldl( fun({Mod, Impl}, ok) -> check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso Mod:check_vhost_access( - auth_user(User, Impl), VHostPath, Sock) + auth_user(User, Impl), VHostPath, AuthzData) end, Mod, "access to vhost '~s' refused for user '~s'", [VHostPath, Username], not_allowed); diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 417f0d6374..2f8e85f0f3 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -113,7 +113,7 @@ internal_check_user_login(Username, Fun) -> Refused end. -check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) -> +check_vhost_access(#auth_user{username = Username}, VHostPath, _AuthzData) -> case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index 461a43e5d9..24d2163f16 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -44,12 +44,12 @@ quorum_queue_migration(FeatureName, _FeatureProps, enable) -> Tables = ?quorum_queue_tables, - rabbit_table:wait(Tables), + rabbit_table:wait(Tables, _Retry = true), Fields = amqqueue:fields(amqqueue_v2), migrate_to_amqqueue_with_type(FeatureName, Tables, Fields); quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) -> Tables = ?quorum_queue_tables, - rabbit_table:wait(Tables), + rabbit_table:wait(Tables, _Retry = true), Fields = amqqueue:fields(amqqueue_v2), mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 696b25f5e4..6a3cafbc28 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -181,14 +181,11 @@ notify_auth_result(Username, AuthResult, ExtraProps) -> ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -authz_socket_info_direct(Infos) -> - #authz_socket_info{sockname={proplists:get_value(host, Infos), - proplists:get_value(port, Infos)}, - peername={proplists:get_value(peer_host, Infos), - proplists:get_value(peer_port, Infos)}}. - connect1(User, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of + % Note: peer_host can be either a tuple or + % a binary if reverse_dns_lookups is enabled + PeerHost = proplists:get_value(peer_host, Infos), + try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}) of ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 2441587bce..4e5e3d2a28 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -39,7 +39,7 @@ -export([get_disk_free_limit/0, set_disk_free_limit/1, get_min_check_interval/0, set_min_check_interval/1, get_max_check_interval/0, set_max_check_interval/1, - get_disk_free/0]). + get_disk_free/0, set_enabled/1]). -define(SERVER, ?MODULE). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). @@ -112,10 +112,14 @@ set_max_check_interval(Interval) -> gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). -spec get_disk_free() -> (integer() | 'unknown'). +-spec set_enabled(string()) -> 'ok'. get_disk_free() -> gen_server:call(?MODULE, get_disk_free, infinity). +set_enabled(Enabled) -> + gen_server:call(?MODULE, {set_enabled, Enabled}, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -165,6 +169,15 @@ handle_call({set_max_check_interval, MaxInterval}, _From, State) -> handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> {reply, Actual, State}; +handle_call({set_enabled, _Enabled = true}, _From, State) -> + start_timer(set_disk_limits(State, State#state.limit)), + rabbit_log:info("Free disk space monitor was enabled"), + {reply, ok, State#state{enabled = true}}; +handle_call({set_enabled, _Enabled = false}, _From, State) -> + erlang:cancel_timer(State#state.timer), + rabbit_log:info("Free disk space monitor was manually disabled"), + {reply, ok, State#state{enabled = false}}; + handle_call(_Request, _From, State) -> {noreply, State}. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d397459d4f..129dab77ff 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -63,6 +63,7 @@ make_discard/2, make_credit/4, make_purge/0, + make_purge_nodes/1, make_update_config/1 ]). @@ -84,6 +85,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). +-record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). -opaque protocol() :: @@ -94,6 +96,7 @@ #discard{} | #credit{} | #purge{} | + #purge_nodes{} | #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). @@ -189,11 +192,9 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0} = State) -> case Cons0 of - #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> - Checked = maps:without(MsgIds, Checked0), + #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = maps:values(Returned), - return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State); + return(Meta, ConsumerId, Returned, [], State); _ -> {State, ok} end; @@ -322,7 +323,7 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, Pid, noconnection}, +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, @@ -330,17 +331,30 @@ apply(_, {down, Pid, noconnection}, Node = node(Pid), %% if the pid refers to the active consumer, mark it as suspected and return %% it to the waiting queue - {State1, Effects0} = case maps:to_list(Cons0) of - [{{_, P} = Cid, C}] when node(P) =:= Node -> - %% the consumer should be returned to waiting - %% - Effs = consumer_update_active_effects( - State0, Cid, C, false, suspected_down, []), - {State0#?MODULE{consumers = #{}, - waiting_consumers = Waiting0 ++ [{Cid, C}]}, - Effs}; - _ -> {State0, []} - end, + {State1, Effects0} = + case maps:to_list(Cons0) of + [{{_, P} = Cid, C0}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + State0, Cid, C0, false, suspected_down, []), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(State0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = #{}, + waiting_consumers = Waiting}, + Effs1}; + _ -> {State0, []} + end, WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -354,8 +368,8 @@ apply(_, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - {State#?MODULE{enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, noconnection}, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that @@ -367,23 +381,22 @@ apply(_, {down, Pid, noconnection}, %% all pids for the disconnected node will be marked as suspected not just %% the one we got the `down' command for Node = node(Pid), - ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, - status = up} = C, - {Co, St0, Eff}) when node(P) =:= Node -> - {St, Eff0} = return_all(St0, Checked0, Eff, K, C), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff0), - {maps:put(K, C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {State, Effects1} = + maps:fold( + fun({_, P} = Cid, #consumer{checked_out = Checked0, + status = up} = C0, + {St0, Eff}) when node(P) =:= Node -> + Credit = increase_credit(C0, map_size(Checked0)), + C = C0#consumer{status = suspected_down, + credit = Credit}, + {St, Eff0} = return_all(St0, Eff, Cid, C), + Eff1 = consumer_update_active_effects(St, Cid, C, false, + suspected_down, Eff0), + {St, Eff1}; + (_, _, {St, Eff}) -> + {St, Eff} + end, {State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = suspected_down}; (_, E) -> E @@ -392,34 +405,15 @@ apply(_, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects2 = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end ++ Effects1, - %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; -apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, - {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), - % return checked out messages to main queue - % Find the consumers for the down pid - DownConsumers = maps:keys( - maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E, down) - end, {State2, Effects1}, DownConsumers), + Effects = case maps:size(State#?MODULE.consumers) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, _Info}, State0) -> + {State, Effects} = handle_down(Pid, State0), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, @@ -450,16 +444,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + State1 = State0#?MODULE{consumers = Cons1, + enqueuers = Enqs1, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; +apply(_, #purge_nodes{nodes = Nodes}, State0) -> + {State, Effects} = lists:foldl(fun(Node, {S, E}) -> + purge_node(Node, S, E) + end, {State0, []}, Nodes), + {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). +purge_node(Node, State, Effects) -> + lists:foldl(fun(Pid, {S0, E0}) -> + {S, E} = handle_down(Pid, S0), + {S, E0 ++ E} + end, {State, Effects}, all_pids_for(Node, State)). + +%% any downs that re not noconnection +handle_down(Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + % Remove any enqueuer for the same pid and enqueue any pending messages + % This should be ok as we won't see any more enqueues from this pid + State1 = case maps:take(Pid, Enqs0) of + {#enqueuer{pending = Pend}, Enqs} -> + lists:foldl(fun ({_, RIdx, RawMsg}, S) -> + enqueue(RIdx, RawMsg, S) + end, State0#?MODULE{enqueuers = Enqs}, Pend); + error -> + State0 + end, + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), + % return checked out messages to main queue + % Find the consumers for the down pid + DownConsumers = maps:keys( + maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), + lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E, down) + end, {State2, Effects1}, DownConsumers). + consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, @@ -558,8 +586,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, + %% TODO: call a handler that works out if any known nodes need to be + %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, - handle_tick, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, @@ -811,9 +841,9 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, | Effects]. cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> - case maps:take(ConsumerId, C0) of - {Consumer, Cons1} -> - {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, + case C0 of + #{ConsumerId := Consumer} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, Effects0, Reason), %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but @@ -826,7 +856,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> _ -> {S, Effects} end; - error -> + _ -> %% already removed: do nothing {S0, Effects0} end. @@ -863,9 +893,9 @@ activate_next_consumer(#?MODULE{consumers = Cons, -maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, - Cons1, #?MODULE{consumers = C0, - service_queue = SQ0} = S0, +maybe_return_all(ConsumerId, Consumer, + #?MODULE{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> case Reason of consumer_cancel -> @@ -875,11 +905,12 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, credit = 0, status = cancelled}, C0, SQ0, Effects0), - {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1}; + {S0#?MODULE{consumers = Cons, + service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, - Consumer), - {S1#?MODULE{consumers = Cons1}, Effects1} + {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), + {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> @@ -1009,41 +1040,46 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, - Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, - SQ0, Effects0), - {State1, Effects2} = lists:foldl( - fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, - ConsumerId, Con); - ({'$empty_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, - ConsumerId, Con); - ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0, - ConsumerId, Con) - end, {State0, Effects1}, MsgNumMsgs), - checkout(Meta, State1#?MODULE{consumers = Cons, - service_queue = SQ}, - Effects2). +return(Meta, ConsumerId, Returned, + Effects0, #?MODULE{service_queue = SQ0} = State0) -> + {State1, Effects1} = maps:fold( + fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; + Tag == '$empty_msg'-> + return_one(MsgId, 0, Msg, S0, E0, ConsumerId); + (MsgId, {MsgNum, Msg}, {S0, E0}) -> + return_one(MsgId, MsgNum, Msg, S0, E0, + ConsumerId) + end, {State0, Effects0}, Returned), + #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers, + Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, + {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, + SQ0, Effects1), + State = State1#?MODULE{consumers = Cons, + service_queue = SQ}, + checkout(Meta, State, Effects2). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, NumDiscarded, - Con0, Checked, Effects0, +complete(ConsumerId, Discarded, + #consumer{checked_out = Checked} = Con0, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> + MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, NumDiscarded)}, + Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), + credit = increase_credit(Con0, maps:size(Discarded))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), - {State0#?MODULE{consumers = Cons, + State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc) + end, State0, maps:values(Discarded)), + {State1#?MODULE{consumers = Cons, ra_indexes = Indexes, service_queue = SQ}, Effects}. @@ -1062,21 +1098,9 @@ increase_credit(#consumer{credit = Current}, Credit) -> complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> - Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), - MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], - State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> - add_bytes_settle(Header, Acc); - ({'$prefix_msg', Header}, Acc) -> - add_bytes_settle(Header, Acc); - ({'$empty_msg', Header}, Acc) -> - add_bytes_settle(Header, Acc) - end, State0, maps:values(Discarded)), - %% need to pass the length of discarded as $prefix_msgs would be filtered - %% by the above list comprehension - {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, - maps:size(Discarded), - Con0, Checked, Effects0, State1), + {State2, Effects1} = complete(ConsumerId, Discarded, Con0, + Effects0, State0), {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, State, Effects). @@ -1136,79 +1160,80 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {Tag, Header0}, +return_one(MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'-> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - Checked = Con#consumer.checked_out, - {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, - Effects0, State0), - {add_bytes_settle(Header, State1), Effects}; + complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, State1 = case Tag of '$empty_msg' -> State0; _ -> add_in_memory_counts(maps:get(size, Header), State0) end, - {add_bytes_return(Header, - State1#?MODULE{returns = lqueue:in(Msg, Returns)}), + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in(Msg, Returns)}), Effects0} end; -return_one(MsgNum, {RaftId, {Header0, RawMsg}}, +return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> DlMsg = {MsgNum, Msg}, - Effects = dead_letter_effects(delivery_limit, - #{none => DlMsg}, + Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), - Checked = Con#consumer.checked_out, - {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, - Effects, State0), - {add_bytes_settle(Header, State1), Effects1}; + complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + %% this should not affect the release cursor in any way State1 = case RawMsg of 'empty' -> State0; _ -> add_in_memory_counts(maps:get(size, Header), State0) end, - %% this should not affect the release cursor in any way - {add_bytes_return(Header, - State1#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> +return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, + #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E, ConsumerId, Consumer); - ({_, {'$empty_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E, ConsumerId, Consumer); - ({_, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) - end, {State0, Effects0}, Checked). + State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) + end, {State, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, {#{}, #{}}), - case evaluate_limit(State0#?MODULE.ra_indexes, false, - State1, Effects1) of + case evaluate_limit(false, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> @@ -1238,17 +1263,16 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> end, {State0, ok, lists:reverse(Effects1)}. -evaluate_limit(_OldIndexes, Result, +evaluate_limit(Result, #?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(OldIndexes, Result, - State0, Effects0) -> +evaluate_limit(Result, State0, Effects0) -> case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(OldIndexes, true, State, Effects); + evaluate_limit(true, State, Effects); false -> {State0, Result, Effects0} end. @@ -1596,6 +1620,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. +-spec make_purge_nodes([node()]) -> protocol(). +make_purge_nodes(Nodes) -> + #purge_nodes{nodes = Nodes}. + -spec make_update_config(config()) -> protocol(). make_update_config(Config) -> #update_config{config = Config}. @@ -1643,6 +1671,39 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +all_nodes(#?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Nodes0 = maps:fold(fun({_, P}, _, Acc) -> + Acc#{node(P) => ok} + end, #{}, Cons0), + Nodes1 = maps:fold(fun(P, _, Acc) -> + Acc#{node(P) => ok} + end, Nodes0, Enqs0), + maps:keys( + lists:foldl(fun({{_, P}, _}, Acc) -> + Acc#{node(P) => ok} + end, Nodes1, WaitingConsumers0)). + +all_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, _}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + suspected_pids_for(Node, #?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 65383b80af..162badc33d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -945,10 +945,13 @@ with_running_or_clean_mnesia(Fun) -> false -> SavedMnesiaDir = dir(), application:unset_env(mnesia, dir), + SchemaLoc = application:get_env(mnesia, schema_location, opt_disc), + application:set_env(mnesia, schema_location, ram), mnesia:start(), Result = Fun(), application:stop(mnesia), application:set_env(mnesia, dir, SavedMnesiaDir), + application:set_env(mnesia, schema_location, SchemaLoc), Result end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9abbd7cfff..45522b4fa3 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -389,6 +389,7 @@ force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. +-spec failed_to_recv_proxy_header(_, _) -> no_return(). failed_to_recv_proxy_header(Ref, Error) -> Msg = case Error of closed -> "error when receiving proxy header: TCP socket was ~p prematurely"; diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b11a40cead..8258584e3c 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/2]). +-export([become_leader/2, handle_tick/3]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -249,7 +249,9 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, + {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, + Nodes) -> %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), @@ -272,7 +274,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_ready, MR}, {messages_unacknowledged, MU}, {reductions, R}]), - ok = repair_leader_record(QName, Self) + ok = repair_leader_record(QName, Self), + ExpectedNodes = rabbit_mnesia:cluster_nodes(all), + case Nodes -- ExpectedNodes of + [] -> + ok; + Stale -> + rabbit_log:info("~s: stale nodes detected. Purging ~w~n", + [rabbit_misc:rs(QName), Stale]), + %% pipeline purge command + {ok, Q} = rabbit_amqqueue:lookup(QName), + ok = ra:pipeline_command(amqqueue:get_pid(Q), + rabbit_fifo:make_purge_nodes(Stale)), + + ok + end end), ok. @@ -663,7 +679,8 @@ add_member(VHost, Name, Node) -> true -> case lists:member(Node, QNodes) of true -> - {error, already_a_member}; + %% idempotent by design + ok; false -> add_member(Q, Node) end @@ -709,16 +726,12 @@ delete_member(VHost, Name, Node) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> QNodes = amqqueue:get_quorum_nodes(Q), - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of + case lists:member(Node, QNodes) of false -> - {error, node_not_running}; + %% idempotent by design + ok; true -> - case lists:member(Node, QNodes) of - false -> - {error, not_a_member}; - true -> - delete_member(Q, Node) - end + delete_member(Q, Node) end; {error, not_found} = E -> E @@ -733,7 +746,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% deleting the last member is not allowed {error, last_node}; _ -> - case ra:leave_and_delete_server(ServerId) of + case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of ok -> Fun = fun(Q1) -> amqqueue:set_quorum_nodes( diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4aea1495a4..aa26cf5482 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1229,7 +1229,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, throttle = Throttle}) -> ok = is_over_connection_limit(VHost, User), - ok = rabbit_access_control:check_vhost_access(User, VHost, Sock), + ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}), ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 60e9c9dac2..7dfc5fb573 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -16,7 +16,7 @@ -module(rabbit_table). --export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, +-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, wait/2, force_load/0, is_present/0, is_empty/0, needs_default_data/0, check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, wait_for_replicated/0]). diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index c9f1565c51..40317ec604 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -55,7 +55,8 @@ groups() -> force_boot, status_with_alarm, pid_file_and_await_node_startup, - await_running_count + await_running_count, + start_with_invalid_schema_in_path ]}, {cluster_size_4, [], [ forget_promotes_offline_slave @@ -121,6 +122,42 @@ end_per_testcase(Testcase, Config) -> %% Testcases. %% ------------------------------------------------------------------- + +start_with_invalid_schema_in_path(Config) -> + [Rabbit, Hare] = cluster_members(Config), + stop_app(Rabbit), + stop_app(Hare), + + create_bad_schema(Rabbit, Hare, Config), + + start_app(Hare), + case start_app(Rabbit) of + ok -> ok; + ErrRabbit -> error({unable_to_start_with_bad_schema_in_work_dir, ErrRabbit}) + end. + +create_bad_schema(Rabbit, Hare, Config) -> + + {ok, RabbitMnesiaDir} = rpc:call(Rabbit, application, get_env, [mnesia, dir]), + {ok, HareMnesiaDir} = rpc:call(Hare, application, get_env, [mnesia, dir]), + %% Make sure we don't use the current dir: + PrivDir = ?config(priv_dir, Config), + ct:pal("Priv dir ~p~n", [PrivDir]), + ok = filelib:ensure_dir(filename:join(PrivDir, "file")), + + ok = rpc:call(Rabbit, file, set_cwd, [PrivDir]), + ok = rpc:call(Hare, file, set_cwd, [PrivDir]), + + ok = rpc:call(Rabbit, application, unset_env, [mnesia, dir]), + ok = rpc:call(Hare, application, unset_env, [mnesia, dir]), + ok = rpc:call(Rabbit, mnesia, create_schema, [[Rabbit, Hare]]), + ok = rpc:call(Rabbit, mnesia, start, []), + {atomic,ok} = rpc:call(Rabbit, mnesia, create_table, + [rabbit_queue, [{ram_copies, [Rabbit, Hare]}]]), + stopped = rpc:call(Rabbit, mnesia, stop, []), + ok = rpc:call(Rabbit, application, set_env, [mnesia, dir, RabbitMnesiaDir]), + ok = rpc:call(Hare, application, set_env, [mnesia, dir, HareMnesiaDir]). + join_and_part_cluster(Config) -> [Rabbit, Hare, Bunny] = cluster_members(Config), assert_not_clustered(Rabbit), diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 31d6a4c21b..6b623304e8 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -54,8 +54,9 @@ groups() -> add_member_not_found, delete_member_not_running, delete_member_classic, - delete_member_not_found, - delete_member] + delete_member_queue_not_found, + delete_member, + delete_member_not_a_member] ++ all_tests()}, {cluster_size_2, [], memory_tests()}, {cluster_size_3, [], [ @@ -1235,7 +1236,8 @@ add_member_already_a_member(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({error, already_a_member}, + %% idempotent by design + ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, add_member, [<<"/">>, QQ, Server])). @@ -1273,7 +1275,8 @@ delete_member_not_running(Config) -> QQ = ?config(queue_name, Config), ?assertEqual({'queue.declare_ok', QQ, 0, 0}, declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - ?assertEqual({error, node_not_running}, + %% it should be possible to delete members that are not online (e.g. decomissioned) + ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, 'rabbit@burrow'])). @@ -1286,7 +1289,7 @@ delete_member_classic(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, CQ, Server])). -delete_member_not_found(Config) -> +delete_member_queue_not_found(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), QQ = ?config(queue_name, Config), ?assertEqual({error, not_found}, @@ -1302,12 +1305,23 @@ delete_member(Config) -> timer:sleep(100), ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, + [<<"/">>, QQ, Server])). + +delete_member_not_a_member(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + timer:sleep(100), + ?assertEqual(ok, + rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])), - ?assertEqual({error, not_a_member}, + %% idempotent by design + ?assertEqual(ok, rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). - cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index cf14a68d9d..310553dc56 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -283,6 +283,19 @@ duplicate_enqueue_test(_) -> ?assertNoEffect({send_msg, _, {delivery, _, [{_, {_, first}}]}, _}, Effects3), ok. +return_test(_) -> + Cid = {<<"cid">>, self()}, + Cid2 = {<<"cid2">>, self()}, + {State0, _} = enq(1, 1, msg, test_init(test)), + {State1, _} = check_auto(Cid, 2, State0), + {State2, _} = check_auto(Cid2, 3, State1), + {State3, _, _} = apply(meta(4), rabbit_fifo:make_return(Cid, [0]), State2), + ?assertMatch(#{Cid := #consumer{checked_out = C}} when map_size(C) == 0, + State3#rabbit_fifo.consumers), + ?assertMatch(#{Cid2 := #consumer{checked_out = C2}} when map_size(C2) == 1, + State3#rabbit_fifo.consumers), + ok. + return_non_existent_test(_) -> Cid = {<<"cid">>, self()}, {State0, [_, _Inactive]} = enq(1, 1, second, test_init(test)), @@ -382,14 +395,18 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test(_) -> % monitor both enqueuer and consumer % because we received a noconnection we now need to monitor the node {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), - #consumer{credit = 1} = maps:get(Cid, State2a#rabbit_fifo.consumers), + #consumer{credit = 1, + checked_out = Ch, + status = suspected_down} = maps:get(Cid, State2a#rabbit_fifo.consumers), + ?assertEqual(#{}, Ch), %% validate consumer has credit {State2, _, Effects2} = apply(meta(3), {down, Self, noconnection}, State2a), ?ASSERT_EFF({monitor, node, _}, Effects2), ?assertNoEffect({demonitor, process, _}, Effects2), % when the node comes up we need to retry the process monitors for the % disconnected processes - {_State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + {State3, _, Effects3} = apply(meta(3), {nodeup, Node}, State2), + #consumer{status = up} = maps:get(Cid, State3#rabbit_fifo.consumers), % try to re-monitor the suspect processes ?ASSERT_EFF({monitor, process, P}, P =:= Pid, Effects3), ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), @@ -407,6 +424,10 @@ down_with_noconnection_returns_unack_test(_) -> {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, State1), ?assertEqual(0, maps:size(State2a#rabbit_fifo.messages)), ?assertEqual(1, lqueue:len(State2a#rabbit_fifo.returns)), + ?assertMatch(#consumer{checked_out = Ch, + status = suspected_down} + when map_size(Ch) == 0, + maps:get(Cid, State2a#rabbit_fifo.consumers)), ok. down_with_noproc_enqueuer_is_cleaned_up_test(_) -> @@ -426,7 +447,8 @@ discarded_message_without_dead_letter_handler_is_removed_test(_) -> ?ASSERT_EFF({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects1), - {_State2, _, Effects2} = apply(meta(1), rabbit_fifo:make_discard(Cid, [0]), State1), + {_State2, _, Effects2} = apply(meta(1), + rabbit_fifo:make_discard(Cid, [0]), State1), ?assertNoEffect({send_msg, _, {delivery, _, [{0, {#{}, first}}]}, _}, Effects2), @@ -458,9 +480,12 @@ tick_test(_) -> {S3, {_, _}} = deq(4, Cid2, unsettled, S2), {S4, _, _} = apply(meta(5), rabbit_fifo:make_return(Cid, [MsgId]), S3), - [{mod_call, _, _, + [{mod_call, rabbit_quorum_queue, handle_tick, [#resource{}, - {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}]}, {aux, emit}] = rabbit_fifo:tick(1, S4), + {?FUNCTION_NAME, 1, 1, 2, 1, 3, 3}, + [_Node] + ]}, + {aux, emit}] = rabbit_fifo:tick(1, S4), ok. @@ -556,7 +581,7 @@ purge_with_checkout_test(_) -> ?assertEqual(1, maps:size(Checked)), ok. -down_returns_checked_out_in_order_test(_) -> +down_noproc_returns_checked_out_in_order_test(_) -> S0 = test_init(?FUNCTION_NAME), %% enqueue 100 S1 = lists:foldl(fun (Num, FS0) -> @@ -572,6 +597,30 @@ down_returns_checked_out_in_order_test(_) -> {S, _, _} = apply(meta(102), {down, self(), noproc}, S2), Returns = lqueue:to_list(S#rabbit_fifo.returns), ?assertEqual(100, length(Returns)), + ?assertEqual(0, maps:size(S#rabbit_fifo.consumers)), + %% validate returns are in order + ?assertEqual(lists:sort(Returns), Returns), + ok. + +down_noconnection_returns_checked_out_test(_) -> + S0 = test_init(?FUNCTION_NAME), + NumMsgs = 20, + S1 = lists:foldl(fun (Num, FS0) -> + {FS, _} = enq(Num, Num, Num, FS0), + FS + end, S0, lists:seq(1, NumMsgs)), + ?assertEqual(NumMsgs, maps:size(S1#rabbit_fifo.messages)), + Cid = {<<"cid">>, self()}, + {S2, _} = check(Cid, 101, 1000, S1), + #consumer{checked_out = Checked} = maps:get(Cid, S2#rabbit_fifo.consumers), + ?assertEqual(NumMsgs, maps:size(Checked)), + %% simulate down + {S, _, _} = apply(meta(102), {down, self(), noconnection}, S2), + Returns = lqueue:to_list(S#rabbit_fifo.returns), + ?assertEqual(NumMsgs, length(Returns)), + ?assertMatch(#consumer{checked_out = Ch} + when map_size(Ch) == 0, + maps:get(Cid, S#rabbit_fifo.consumers)), %% validate returns are in order ?assertEqual(lists:sort(Returns), Returns), ok. @@ -736,6 +785,41 @@ single_active_consumer_cancel_consumer_when_channel_is_down_test(_) -> ok. +single_active_returns_messages_on_noconnection_test(_) -> + R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => R, + release_cursor_interval => 0, + single_active_consumer_on => true}), + Meta = #{index => 1}, + Nodes = [n1], + ConsumerIds = [{_, DownPid}] = + [begin + B = atom_to_binary(N, utf8), + {<<"ctag_", B/binary>>, + test_util:fake_pid(N)} + end || N <- Nodes], + % adding some consumers + State1 = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), + {State2, _} = enq(4, 1, msg1, State1), + % simulate node goes down + {State3, _, _} = apply(meta(5), {down, DownPid, noconnection}, State2), + %% assert the consumer is up + ?assertMatch([_], lqueue:to_list(State3#rabbit_fifo.returns)), + ?assertMatch([{_, #consumer{checked_out = Checked}}] + when map_size(Checked) == 0, + State3#rabbit_fifo.waiting_consumers), + + ok. + single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> R = rabbit_misc:r("/", queue, atom_to_binary(?FUNCTION_NAME, utf8)), State0 = init(#{name => ?FUNCTION_NAME, @@ -751,26 +835,30 @@ single_active_consumer_replaces_consumer_when_down_noconnection_test(_) -> test_util:fake_pid(N)} end || N <- Nodes], % adding some consumers - State1 = lists:foldl( - fun(CId, Acc0) -> - {Acc, _, _} = - apply(Meta, - make_checkout(CId, - {once, 1, simple_prefetch}, #{}), - Acc0), - Acc - end, State0, ConsumerIds), + State1a = lists:foldl( + fun(CId, Acc0) -> + {Acc, _, _} = + apply(Meta, + make_checkout(CId, + {once, 1, simple_prefetch}, #{}), + Acc0), + Acc + end, State0, ConsumerIds), %% assert the consumer is up ?assertMatch(#{C1 := #consumer{status = up}}, - State1#rabbit_fifo.consumers), + State1a#rabbit_fifo.consumers), + + {State1, _} = enq(10, 1, msg, State1a), % simulate node goes down {State2, _, _} = apply(meta(5), {down, DownPid, noconnection}, State1), %% assert a new consumer is in place and it is up - ?assertMatch([{C2, #consumer{status = up}}], - maps:to_list(State2#rabbit_fifo.consumers)), + ?assertMatch([{C2, #consumer{status = up, + checked_out = Ch}}] + when map_size(Ch) == 1, + maps:to_list(State2#rabbit_fifo.consumers)), %% the disconnected consumer has been returned to waiting ?assert(lists:any(fun ({C,_}) -> C =:= C1 end, @@ -836,10 +924,11 @@ single_active_consumer_all_disconnected_test(_) -> single_active_consumer_state_enter_leader_include_waiting_consumers_test(_) -> State0 = init(#{name => ?FUNCTION_NAME, - queue_resource => rabbit_misc:r("/", queue, - atom_to_binary(?FUNCTION_NAME, utf8)), - release_cursor_interval => 0, - single_active_consumer_on => true}), + queue_resource => + rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + release_cursor_interval => 0, + single_active_consumer_on => true}), DummyFunction = fun() -> ok end, Pid1 = spawn(DummyFunction), @@ -990,11 +1079,11 @@ active_flag_updated_when_consumer_suspected_unsuspected_test(_) -> State1 = lists:foldl(AddConsumer, State0, [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(#{index => 3}, {down, Pid1, noconnection}, State1), % 1 effect to update the metrics of each consumer (they belong to the same node), 1 more effect to monitor the node ?assertEqual(4 + 1, length(Effects2)), - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(#{index => 4}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to update the metrics, 1 effect to monitor the consumer PID ?assertEqual(4 + 4, length(Effects3)). @@ -1023,11 +1112,11 @@ active_flag_not_updated_when_consumer_suspected_unsuspected_and_single_active_co [{<<"ctag1">>, Pid1}, {<<"ctag2">>, Pid2}, {<<"ctag3">>, Pid2}, {<<"ctag4">>, Pid3}]), - {State2, _, Effects2} = apply(#{}, {down, Pid1, noconnection}, State1), + {State2, _, Effects2} = apply(#{index => 2}, {down, Pid1, noconnection}, State1), % one monitor and one consumer status update (deactivated) ?assertEqual(3, length(Effects2)), - {_, _, Effects3} = apply(#{index => 1}, {nodeup, node(self())}, State2), + {_, _, Effects3} = apply(#{index => 3}, {nodeup, node(self())}, State2), % for each consumer: 1 effect to monitor the consumer PID ?assertEqual(5, length(Effects3)). @@ -1118,6 +1207,54 @@ single_active_with_credited_test(_) -> State3#rabbit_fifo.waiting_consumers), ok. +purge_nodes_test(_) -> + Node = purged@node, + ThisNode = node(), + EnqPid = test_util:fake_pid(Node), + EnqPid2 = test_util:fake_pid(node()), + ConPid = test_util:fake_pid(Node), + Cid = {<<"tag">>, ConPid}, + % WaitingPid = test_util:fake_pid(Node), + + State0 = init(#{name => ?FUNCTION_NAME, + queue_resource => rabbit_misc:r("/", queue, + atom_to_binary(?FUNCTION_NAME, utf8)), + single_active_consumer_on => false}), + {State1, _, _} = apply(meta(1), + rabbit_fifo:make_enqueue(EnqPid, 1, msg1), + State0), + {State2, _, _} = apply(meta(2), + rabbit_fifo:make_enqueue(EnqPid2, 1, msg2), + State1), + {State3, _} = check(Cid, 3, 1000, State2), + {State4, _, _} = apply(meta(4), + {down, EnqPid, noconnection}, + State3), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode, Node] + ]}, + {aux, emit}] , rabbit_fifo:tick(1, State4)), + %% assert there are both enqueuers and consumers + {State, _, _} = apply(meta(5), + rabbit_fifo:make_purge_nodes([Node]), + State4), + + %% assert there are no enqueuers nor consumers + ?assertMatch(#rabbit_fifo{enqueuers = Enqs} when map_size(Enqs) == 1, + State), + + ?assertMatch(#rabbit_fifo{consumers = Cons} when map_size(Cons) == 0, + State), + ?assertMatch( + [{mod_call, rabbit_quorum_queue, handle_tick, + [#resource{}, _Metrics, + [ThisNode] + ]}, + {aux, emit}] , rabbit_fifo:tick(1, State)), + ok. + meta(Idx) -> #{index => Idx, term => 1}. diff --git a/test/rabbit_fifo_int_SUITE.erl b/test/rabbit_fifo_int_SUITE.erl index f281d15795..d4ae417a78 100644 --- a/test/rabbit_fifo_int_SUITE.erl +++ b/test/rabbit_fifo_int_SUITE.erl @@ -54,7 +54,7 @@ end_per_group(_, Config) -> init_per_testcase(TestCase, Config) -> meck:new(rabbit_quorum_queue, [passthrough]), - meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _) -> ok end), + meck:expect(rabbit_quorum_queue, handle_tick, fun (_, _, _) -> ok end), meck:expect(rabbit_quorum_queue, cancel_consumer_handler, fun (_, _) -> ok end), ra_server_sup_sup:remove_all(), diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index 58e6c2452c..d670f2bdd7 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -8,6 +8,8 @@ -include_lib("proper/include/proper.hrl"). -include_lib("common_test/include/ct.hrl"). -include_lib("eunit/include/eunit.hrl"). +-include_lib("ra/include/ra.hrl"). +-include("src/rabbit_fifo.hrl"). %%%=================================================================== %%% Common Test callbacks @@ -21,6 +23,7 @@ all() -> all_tests() -> [ + test_run_log, snapshots, scenario1, scenario2, @@ -41,7 +44,14 @@ all_tests() -> scenario17, scenario18, scenario19, - scenario20 + scenario20, + single_active, + single_active_01, + single_active_02, + single_active_03, + single_active_ordering, + single_active_ordering_01 + % single_active_ordering_02 ]. groups() -> @@ -288,7 +298,9 @@ scenario17(_Config) -> make_checkout(C2, cancel), make_enqueue(E,2,<<"two">>), {nodeup,rabbit@fake_node1}, + %% this has no effect as was returned make_settle(C1, [0]), + %% this should settle "one" make_settle(C1, [1]) ], run_snapshot_test(#{name => ?FUNCTION_NAME, @@ -343,6 +355,87 @@ scenario20(_Config) -> max_in_memory_length => 1}, Commands), ok. +single_active_01(_Config) -> + C1Pid = test_util:fake_pid(rabbit@fake_node1), + C1 = {<<0>>, C1Pid}, + C2Pid = test_util:fake_pid(rabbit@fake_node2), + C2 = {<<>>, C2Pid}, + E = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<"one">>), + make_checkout(C2, {auto,1,simple_prefetch}), + make_checkout(C1, cancel), + {nodeup,rabbit@fake_node1} + ], + ?assert( + single_active_prop(#{name => ?FUNCTION_NAME, + single_active_consumer_on => true + }, Commands, false)), + ok. + +single_active_02(_Config) -> + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + C2Pid = test_util:fake_pid(node()), + C2 = {<<>>, C2Pid}, + E = test_util:fake_pid(node()), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E,1,<<"one">>), + {down,E,noconnection}, + make_checkout(C2, {auto,1,simple_prefetch}), + make_checkout(C2, cancel), + {down,E,noconnection} + ], + Conf = config(?FUNCTION_NAME, undefined, undefined, true, 1, undefined, undefined), + ?assert(single_active_prop(Conf, Commands, false)), + ok. + +single_active_03(_Config) -> + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + % C2Pid = test_util:fake_pid(rabbit@fake_node2), + % C2 = {<<>>, C2Pid}, + Pid = test_util:fake_pid(node()), + E = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E, 1, 0), + make_enqueue(E, 2, 1), + {down, Pid, noconnection}, + {nodeup, node()} + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, undefined, undefined), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + +test_run_log(_Config) -> + Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, + run_proper( + fun () -> + ?FORALL({Length, Bytes, SingleActiveConsumer, DeliveryLimit, InMemoryLength, + InMemoryBytes}, + frequency([{10, {0, 0, false, 0, 0, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + boolean(), + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(100), expand(Ops, Fun)), + collect({log_size, length(O)}, + dump_generated( + config(?FUNCTION_NAME, + Length, + Bytes, + SingleActiveConsumer, + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O)))) + end, [], 10). + snapshots(_Config) -> run_proper( fun () -> @@ -368,6 +461,90 @@ snapshots(_Config) -> InMemoryBytes), O)))) end, [], 2500). +single_active(_Config) -> + Size = 2000, + run_proper( + fun () -> + ?FORALL({Length, Bytes, DeliveryLimit, InMemoryLength, InMemoryBytes}, + frequency([{10, {0, 0, 0, 0, 0}}, + {5, {oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]), + oneof([range(1, 3), undefined]), + oneof([range(1, 10), undefined]), + oneof([range(1, 1000), undefined]) + }}]), + ?FORALL(O, ?LET(Ops, log_gen(Size), expand(Ops)), + collect({log_size, length(O)}, + single_active_prop( + config(?FUNCTION_NAME, + Length, + Bytes, + true, + DeliveryLimit, + InMemoryLength, + InMemoryBytes), O, + false)))) + end, [], Size). + +single_active_ordering(_Config) -> + Size = 2000, + Fun = {-1, fun ({Prev, _}) -> {Prev + 1, Prev + 1} end}, + run_proper( + fun () -> + ?FORALL(O, ?LET(Ops, log_gen_ordered(Size), expand(Ops, Fun)), + collect({log_size, length(O)}, + single_active_prop(config(?FUNCTION_NAME, + undefined, + undefined, + true, + undefined, + undefined, + undefined), O, + true))) + end, [], Size). + +single_active_ordering_01(_Config) -> +% [{enqueue,<0.145.0>,1,0}, +% {enqueue,<0.145.0>,1,1}, +% {checkout,{<<>>,<0.148.0>},{auto,1,simple_prefetch},#{ack => true,args => [],prefetch => 1,username => <<117,115,101,114>>}} +% {enqueue,<0.140.0>,1,2}, +% {settle,{<<>>,<0.148.0>},[0]}] + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + E = test_util:fake_pid(rabbit@fake_node2), + E2 = test_util:fake_pid(rabbit@fake_node2), + Commands = [ + make_enqueue(E, 1, 0), + make_enqueue(E, 2, 1), + make_checkout(C1, {auto,2,simple_prefetch}), + make_enqueue(E2, 1, 2), + make_settle(C1, [0]) + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + +single_active_ordering_02(_Config) -> + %% this results in the pending enqueue being enqueued and violating + %% ordering +% [{checkout, % {<<>>,<0.177.0>}, % {auto,1,simple_prefetch}, +% {enqueue,<0.172.0>,2,1}, +% {down,<0.172.0>,noproc}, +% {settle,{<<>>,<0.177.0>},[0]}] + C1Pid = test_util:fake_pid(node()), + C1 = {<<0>>, C1Pid}, + E = test_util:fake_pid(node()), + Commands = [ + make_checkout(C1, {auto,1,simple_prefetch}), + make_enqueue(E, 2, 1), + %% CANNOT HAPPEN + {down,E,noproc}, + make_settle(C1, [0]) + ], + Conf = config(?FUNCTION_NAME, 0, 0, true, 0, 0, 0), + ?assert(single_active_prop(Conf, Commands, true)), + ok. + config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemoryBytes) -> #{name => Name, max_length => map_max(Length), @@ -380,6 +557,65 @@ config(Name, Length, Bytes, SingleActive, DeliveryLimit, InMemoryLength, InMemor map_max(0) -> undefined; map_max(N) -> N. +single_active_prop(Conf0, Commands, ValidateOrder) -> + Conf = Conf0#{release_cursor_interval => 100}, + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + %% invariant: there can only be one active consumer at any one time + %% there can however be multiple cancelled consumers + Invariant = fun (#rabbit_fifo{consumers = Consumers}) -> + Up = maps:filter(fun (_, #consumer{status = S}) -> + S == up + end, Consumers), + map_size(Up) =< 1 + end, + try run_log(test_init(Conf), Entries, Invariant) of + {_State, Effects} when ValidateOrder -> + %% validate message ordering + lists:foldl(fun ({send_msg, Pid, {delivery, Tag, Msgs}, ra_event}, + Acc) -> + validate_msg_order({Tag, Pid}, Msgs, Acc); + (_, Acc) -> + Acc + end, -1, Effects), + true; + _ -> + true + catch + Err -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + ct:pal("Err: ~p~n", [Err]), + false + end. + +%% single active consumer ordering invariant: +%% only redelivered messages can go backwards +validate_msg_order(_, [], S) -> + S; +validate_msg_order(Cid, [{_, {H, Num}} | Rem], PrevMax) -> + Redelivered = maps:is_key(delivery_count, H), + case undefined of + _ when Num == PrevMax + 1 -> + %% forwards case + validate_msg_order(Cid, Rem, Num); + _ when Redelivered andalso Num =< PrevMax -> + %% the seq is lower but this is a redelivery + %% when the consumer changed and the next messages has been redelivered + %% we may go backwards but keep the highest seen + validate_msg_order(Cid, Rem, PrevMax); + _ -> + ct:pal("out of order ~w Prev ~w Curr ~w Redel ~w", + [Cid, PrevMax, Num, Redelivered]), + throw({outoforder, Cid, PrevMax, Num}) + end. + + + + +dump_generated(Conf, Commands) -> + ct:pal("Commands: ~p~nConf~p~n", [Commands, Conf]), + true. + snapshots_prop(Conf, Commands) -> try run_snapshot_test(Conf, Commands) of _ -> true @@ -391,6 +627,9 @@ snapshots_prop(Conf, Commands) -> end. log_gen(Size) -> + log_gen(Size, binary()). + +log_gen(Size, _Body) -> Nodes = [node(), fakenode@fake, fakenode@fake2 @@ -413,6 +652,31 @@ log_gen(Size) -> {1, purge} ]))))). +log_gen_ordered(Size) -> + Nodes = [node(), + fakenode@fake, + fakenode@fake2 + ], + ?LET(EPids, vector(1, pid_gen(Nodes)), + ?LET(CPids, vector(5, pid_gen(Nodes)), + resize(Size, + list( + frequency( + [{20, enqueue_gen(oneof(EPids), 10, 0)}, + {40, {input_event, + frequency([{10, settle}, + {2, return}, + {1, discard}, + {1, requeue}])}}, + {2, checkout_gen(oneof(CPids))}, + {1, checkout_cancel_gen(oneof(CPids))}, + {1, down_gen(oneof(EPids ++ CPids))}, + {1, nodeup_gen(Nodes)} + ]))))). + +monotonic_gen() -> + ?LET(_, integer(), erlang:unique_integer([positive, monotonic])). + pid_gen(Nodes) -> ?LET(Node, oneof(Nodes), test_util:fake_pid(atom_to_binary(Node, utf8))). @@ -424,9 +688,12 @@ nodeup_gen(Nodes) -> {nodeup, oneof(Nodes)}. enqueue_gen(Pid) -> + enqueue_gen(Pid, 10, 1). + +enqueue_gen(Pid, Enq, Del) -> ?LET(E, {enqueue, Pid, - frequency([{10, enqueue}, - {1, delay}]), + frequency([{Enq, enqueue}, + {Del, delay}]), binary()}, E). checkout_cancel_gen(Pid) -> @@ -445,16 +712,21 @@ checkout_gen(Pid) -> enqueuers = #{} :: #{pid() => term()}, consumers = #{} :: #{{binary(), pid()} => term()}, effects = queue:new() :: queue:queue(), + %% to transform the body + enq_body_fun = {0, fun ra_lib:id/1}, log = [] :: list(), down = #{} :: #{pid() => noproc | noconnection} }). expand(Ops) -> + expand(Ops, {undefined, fun ra_lib:id/1}). + +expand(Ops, EnqFun) -> %% execute each command against a rabbit_fifo state and capture all relevant %% effects - T = #t{}, + T = #t{enq_body_fun = EnqFun}, #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), - %% process the remaining effects + %% process the remaining effect #t{log = Log} = lists:foldl(fun do_apply/2, T1#t{effects = queue:new()}, queue:to_list(Effs)), @@ -464,6 +736,7 @@ expand(Ops) -> handle_op({enqueue, Pid, When, Data}, #t{enqueuers = Enqs0, + enq_body_fun = {EnqSt0, Fun}, down = Down, effects = Effs} = T) -> case Down of @@ -474,13 +747,17 @@ handle_op({enqueue, Pid, When, Data}, _ -> Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), MsgSeq = maps:get(Pid, Enqs), - Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Data), + {EnqSt, Msg} = Fun({EnqSt0, Data}), + Cmd = rabbit_fifo:make_enqueue(Pid, MsgSeq, Msg), case When of enqueue -> - do_apply(Cmd, T#t{enqueuers = Enqs}); + do_apply(Cmd, T#t{enqueuers = Enqs, + enq_body_fun = {EnqSt, Fun}}); delay -> %% just put the command on the effects queue - T#t{effects = queue:in(Cmd, Effs)} + T#t{effects = queue:in(Cmd, Effs), + enqueuers = Enqs, + enq_body_fun = {EnqSt, Fun}} end end; handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> @@ -540,8 +817,8 @@ handle_op({input_event, Settlement}, #t{effects = Effs, discard -> rabbit_fifo:make_discard(CId, MsgIds) end, do_apply(Cmd, T#t{effects = Q}); - {{value, Cmd}, Q} when element(1, Cmd) =:= enqueue -> - case maps:is_key(element(2, Cmd), Down) of + {{value, {enqueue, Pid, _, _} = Cmd}, Q} -> + case maps:is_key(Pid, Down) of true -> %% enqueues cannot arrive after down for the same process %% drop message @@ -555,21 +832,30 @@ handle_op({input_event, Settlement}, #t{effects = Effs, handle_op(purge, T) -> do_apply(rabbit_fifo:make_purge(), T). -do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, + +do_apply(Cmd, #t{effects = Effs, + index = Index, state = S0, + down = Down, log = Log} = T) -> - {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of - {S, _, E} when is_list(E) -> - {S, E}; - {S, _, E} -> - {S, [E]}; - {S, _} -> - {S, []} - end, - - T#t{state = St, - index = Index + 1, - effects = enq_effs(Effects, Effs), - log = [Cmd | Log]}. + case Cmd of + {enqueue, Pid, _, _} when is_map_key(Pid, Down) -> + %% down + T; + _ -> + {St, Effects} = case rabbit_fifo:apply(#{index => Index}, Cmd, S0) of + {S, _, E} when is_list(E) -> + {S, E}; + {S, _, E} -> + {S, [E]}; + {S, _} -> + {S, []} + end, + + T#t{state = St, + index = Index + 1, + effects = enq_effs(Effects, Effs), + log = [Cmd | Log]} + end. enq_effs([], Q) -> Q; enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> @@ -637,13 +923,27 @@ prefixes(Source, N, Acc) -> prefixes(Source, N+1, [X | Acc]). run_log(InitState, Entries) -> + run_log(InitState, Entries, fun(_) -> true end). + +run_log(InitState, Entries, InvariantFun) -> + Invariant = fun(E, S) -> + case InvariantFun(S) of + true -> ok; + false -> + throw({invariant, E, S}) + end + end, + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> case rabbit_fifo:apply(meta(Idx), E, Acc0) of {Acc, _, Efx} when is_list(Efx) -> + Invariant(E, Acc), {Acc, Efx0 ++ Efx}; {Acc, _, Efx} -> + Invariant(E, Acc), {Acc, Efx0 ++ [Efx]}; {Acc, _} -> + Invariant(E, Acc), {Acc, Efx0} end end, {InitState, []}, Entries). |
