diff options
| author | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 10:06:18 +0100 |
|---|---|---|
| committer | Diana Corbacho <diana@rabbitmq.com> | 2019-04-04 10:06:18 +0100 |
| commit | 279a35b6aaf9e9b7ce7394500b9641eadfaebd0a (patch) | |
| tree | f0f675e7a85e01cef62ba5c74f081bbf16452a74 /src | |
| parent | 19f8dec83bfa899cf7d43e532ca8a52c73213a97 (diff) | |
| parent | 09ab089066e8148ebe30ce3183958ea039470826 (diff) | |
| download | rabbitmq-server-git-279a35b6aaf9e9b7ce7394500b9641eadfaebd0a.tar.gz | |
Merge remote-tracking branch 'origin/master' into in-memory-limits
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_access_control.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_auth_backend_internal.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_core_ff.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_direct.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_disk_monitor.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 365 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 3 | ||||
| -rw-r--r-- | src/rabbit_networking.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 39 | ||||
| -rw-r--r-- | src/rabbit_reader.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 2 |
11 files changed, 283 insertions, 185 deletions
diff --git a/src/rabbit_access_control.erl b/src/rabbit_access_control.erl index 984ee5371d..954d003991 100644 --- a/src/rabbit_access_control.erl +++ b/src/rabbit_access_control.erl @@ -125,20 +125,30 @@ check_user_loopback(Username, SockOrAddr) -> false -> not_allowed end. --spec check_vhost_access - (rabbit_types:user(), rabbit_types:vhost(), - rabbit_net:socket() | #authz_socket_info{}) -> - 'ok' | rabbit_types:channel_exit(). - +get_authz_data_from({ip, Address}) -> + #{peeraddr => Address}; +get_authz_data_from({socket, Sock}) -> + {ok, {Address, _Port}} = rabbit_net:peername(Sock), + #{peeraddr => Address}; +get_authz_data_from(undefined) -> + undefined. + +% Note: ip can be either a tuple or, a binary if reverse_dns_lookups +% is enabled and it's a direct connection. +-spec check_vhost_access(User :: rabbit_types:user(), + VHostPath :: rabbit_types:vhost(), + AuthzRawData :: {socket, rabbit_net:socket()} | {ip, inet:ip_address() | binary()} | undefined) -> + 'ok' | rabbit_types:channel_exit(). check_vhost_access(User = #user{username = Username, - authz_backends = Modules}, VHostPath, Sock) -> + authz_backends = Modules}, VHostPath, AuthzRawData) -> + AuthzData = get_authz_data_from(AuthzRawData), lists:foldl( fun({Mod, Impl}, ok) -> check_access( fun() -> rabbit_vhost:exists(VHostPath) andalso Mod:check_vhost_access( - auth_user(User, Impl), VHostPath, Sock) + auth_user(User, Impl), VHostPath, AuthzData) end, Mod, "access to vhost '~s' refused for user '~s'", [VHostPath, Username], not_allowed); diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl index 417f0d6374..2f8e85f0f3 100644 --- a/src/rabbit_auth_backend_internal.erl +++ b/src/rabbit_auth_backend_internal.erl @@ -113,7 +113,7 @@ internal_check_user_login(Username, Fun) -> Refused end. -check_vhost_access(#auth_user{username = Username}, VHostPath, _Sock) -> +check_vhost_access(#auth_user{username = Username}, VHostPath, _AuthzData) -> case mnesia:dirty_read({rabbit_user_permission, #user_vhost{username = Username, virtual_host = VHostPath}}) of diff --git a/src/rabbit_core_ff.erl b/src/rabbit_core_ff.erl index 461a43e5d9..24d2163f16 100644 --- a/src/rabbit_core_ff.erl +++ b/src/rabbit_core_ff.erl @@ -44,12 +44,12 @@ quorum_queue_migration(FeatureName, _FeatureProps, enable) -> Tables = ?quorum_queue_tables, - rabbit_table:wait(Tables), + rabbit_table:wait(Tables, _Retry = true), Fields = amqqueue:fields(amqqueue_v2), migrate_to_amqqueue_with_type(FeatureName, Tables, Fields); quorum_queue_migration(_FeatureName, _FeatureProps, is_enabled) -> Tables = ?quorum_queue_tables, - rabbit_table:wait(Tables), + rabbit_table:wait(Tables, _Retry = true), Fields = amqqueue:fields(amqqueue_v2), mnesia:table_info(rabbit_queue, attributes) =:= Fields andalso mnesia:table_info(rabbit_durable_queue, attributes) =:= Fields. diff --git a/src/rabbit_direct.erl b/src/rabbit_direct.erl index 696b25f5e4..6a3cafbc28 100644 --- a/src/rabbit_direct.erl +++ b/src/rabbit_direct.erl @@ -181,14 +181,11 @@ notify_auth_result(Username, AuthResult, ExtraProps) -> ExtraProps, rabbit_event:notify(AuthResult, [P || {_, V} = P <- EventProps, V =/= '']). -authz_socket_info_direct(Infos) -> - #authz_socket_info{sockname={proplists:get_value(host, Infos), - proplists:get_value(port, Infos)}, - peername={proplists:get_value(peer_host, Infos), - proplists:get_value(peer_port, Infos)}}. - connect1(User, VHost, Protocol, Pid, Infos) -> - try rabbit_access_control:check_vhost_access(User, VHost, authz_socket_info_direct(Infos)) of + % Note: peer_host can be either a tuple or + % a binary if reverse_dns_lookups is enabled + PeerHost = proplists:get_value(peer_host, Infos), + try rabbit_access_control:check_vhost_access(User, VHost, {ip, PeerHost}) of ok -> ok = pg_local:join(rabbit_direct, Pid), rabbit_core_metrics:connection_created(Pid, Infos), rabbit_event:notify(connection_created, Infos), diff --git a/src/rabbit_disk_monitor.erl b/src/rabbit_disk_monitor.erl index 2441587bce..4e5e3d2a28 100644 --- a/src/rabbit_disk_monitor.erl +++ b/src/rabbit_disk_monitor.erl @@ -39,7 +39,7 @@ -export([get_disk_free_limit/0, set_disk_free_limit/1, get_min_check_interval/0, set_min_check_interval/1, get_max_check_interval/0, set_max_check_interval/1, - get_disk_free/0]). + get_disk_free/0, set_enabled/1]). -define(SERVER, ?MODULE). -define(DEFAULT_MIN_DISK_CHECK_INTERVAL, 100). @@ -112,10 +112,14 @@ set_max_check_interval(Interval) -> gen_server:call(?MODULE, {set_max_check_interval, Interval}, infinity). -spec get_disk_free() -> (integer() | 'unknown'). +-spec set_enabled(string()) -> 'ok'. get_disk_free() -> gen_server:call(?MODULE, get_disk_free, infinity). +set_enabled(Enabled) -> + gen_server:call(?MODULE, {set_enabled, Enabled}, infinity). + %%---------------------------------------------------------------------------- %% gen_server callbacks %%---------------------------------------------------------------------------- @@ -165,6 +169,15 @@ handle_call({set_max_check_interval, MaxInterval}, _From, State) -> handle_call(get_disk_free, _From, State = #state { actual = Actual }) -> {reply, Actual, State}; +handle_call({set_enabled, _Enabled = true}, _From, State) -> + start_timer(set_disk_limits(State, State#state.limit)), + rabbit_log:info("Free disk space monitor was enabled"), + {reply, ok, State#state{enabled = true}}; +handle_call({set_enabled, _Enabled = false}, _From, State) -> + erlang:cancel_timer(State#state.timer), + rabbit_log:info("Free disk space monitor was manually disabled"), + {reply, ok, State#state{enabled = false}}; + handle_call(_Request, _From, State) -> {noreply, State}. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index d397459d4f..129dab77ff 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -63,6 +63,7 @@ make_discard/2, make_credit/4, make_purge/0, + make_purge_nodes/1, make_update_config/1 ]). @@ -84,6 +85,7 @@ delivery_count :: non_neg_integer(), drain :: boolean()}). -record(purge, {}). +-record(purge_nodes, {nodes :: [node()]}). -record(update_config, {config :: config()}). -opaque protocol() :: @@ -94,6 +96,7 @@ #discard{} | #credit{} | #purge{} | + #purge_nodes{} | #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). @@ -189,11 +192,9 @@ apply(Meta, #discard{msg_ids = MsgIds, consumer_id = ConsumerId}, apply(Meta, #return{msg_ids = MsgIds, consumer_id = ConsumerId}, #?MODULE{consumers = Cons0} = State) -> case Cons0 of - #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> - Checked = maps:without(MsgIds, Checked0), + #{ConsumerId := #consumer{checked_out = Checked0}} -> Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = maps:values(Returned), - return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, [], State); + return(Meta, ConsumerId, Returned, [], State); _ -> {State, ok} end; @@ -322,7 +323,7 @@ apply(#{index := RaftIdx}, #purge{}, %% reverse the effects ourselves {State, {purge, Total}, lists:reverse([garbage_collection | Effects])}; -apply(_, {down, Pid, noconnection}, +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, cfg = #cfg{consumer_strategy = single_active}, waiting_consumers = Waiting0, @@ -330,17 +331,30 @@ apply(_, {down, Pid, noconnection}, Node = node(Pid), %% if the pid refers to the active consumer, mark it as suspected and return %% it to the waiting queue - {State1, Effects0} = case maps:to_list(Cons0) of - [{{_, P} = Cid, C}] when node(P) =:= Node -> - %% the consumer should be returned to waiting - %% - Effs = consumer_update_active_effects( - State0, Cid, C, false, suspected_down, []), - {State0#?MODULE{consumers = #{}, - waiting_consumers = Waiting0 ++ [{Cid, C}]}, - Effs}; - _ -> {State0, []} - end, + {State1, Effects0} = + case maps:to_list(Cons0) of + [{{_, P} = Cid, C0}] when node(P) =:= Node -> + %% the consumer should be returned to waiting + %% and checked out messages should be returned + Effs = consumer_update_active_effects( + State0, Cid, C0, false, suspected_down, []), + Checked = C0#consumer.checked_out, + Credit = increase_credit(C0, maps:size(Checked)), + {St, Effs1} = return_all(State0, Effs, + Cid, C0#consumer{credit = Credit}), + %% if the consumer was cancelled there is a chance it got + %% removed when returning hence we need to be defensive here + Waiting = case St#?MODULE.consumers of + #{Cid := C} -> + Waiting0 ++ [{Cid, C}]; + _ -> + Waiting0 + end, + {St#?MODULE{consumers = #{}, + waiting_consumers = Waiting}, + Effs1}; + _ -> {State0, []} + end, WaitingConsumers = update_waiting_consumer_status(Node, State1, suspected_down), @@ -354,8 +368,8 @@ apply(_, {down, Pid, noconnection}, (_, E) -> E end, Enqs0), Effects = [{monitor, node, Node} | Effects1], - {State#?MODULE{enqueuers = Enqs}, ok, Effects}; -apply(_, {down, Pid, noconnection}, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, noconnection}, #?MODULE{consumers = Cons0, enqueuers = Enqs0} = State0) -> %% A node has been disconnected. This doesn't necessarily mean that @@ -367,23 +381,22 @@ apply(_, {down, Pid, noconnection}, %% all pids for the disconnected node will be marked as suspected not just %% the one we got the `down' command for Node = node(Pid), - ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), - {Cons, State, Effects1} = - maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0, - status = up} = C, - {Co, St0, Eff}) when node(P) =:= Node -> - {St, Eff0} = return_all(St0, Checked0, Eff, K, C), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, - suspected_down, Eff0), - {maps:put(K, C#consumer{status = suspected_down, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {State, Effects1} = + maps:fold( + fun({_, P} = Cid, #consumer{checked_out = Checked0, + status = up} = C0, + {St0, Eff}) when node(P) =:= Node -> + Credit = increase_credit(C0, map_size(Checked0)), + C = C0#consumer{status = suspected_down, + credit = Credit}, + {St, Eff0} = return_all(St0, Eff, Cid, C), + Eff1 = consumer_update_active_effects(St, Cid, C, false, + suspected_down, Eff0), + {St, Eff1}; + (_, _, {St, Eff}) -> + {St, Eff} + end, {State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{status = suspected_down}; (_, E) -> E @@ -392,34 +405,15 @@ apply(_, {down, Pid, noconnection}, % Monitor the node so that we can "unsuspect" these processes when the node % comes back, then re-issue all monitors and discover the final fate of % these processes - Effects2 = case maps:size(Cons) of - 0 -> - [{aux, inactive}, {monitor, node, Node}]; - _ -> - [{monitor, node, Node}] - end ++ Effects1, - %% TODO: should we run a checkout here? - {State#?MODULE{consumers = Cons, enqueuers = Enqs}, ok, Effects2}; -apply(Meta, {down, Pid, _Info}, #?MODULE{consumers = Cons0, - enqueuers = Enqs0} = State0) -> - % Remove any enqueuer for the same pid and enqueue any pending messages - % This should be ok as we won't see any more enqueues from this pid - State1 = case maps:take(Pid, Enqs0) of - {#enqueuer{pending = Pend}, Enqs} -> - lists:foldl(fun ({_, RIdx, RawMsg}, S) -> - enqueue(RIdx, RawMsg, S) - end, State0#?MODULE{enqueuers = Enqs}, Pend); - error -> - State0 - end, - {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), - % return checked out messages to main queue - % Find the consumers for the down pid - DownConsumers = maps:keys( - maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), - {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E, down) - end, {State2, Effects1}, DownConsumers), + Effects = case maps:size(State#?MODULE.consumers) of + 0 -> + [{aux, inactive}, {monitor, node, Node}]; + _ -> + [{monitor, node, Node}] + end ++ Effects1, + checkout(Meta, State#?MODULE{enqueuers = Enqs}, Effects); +apply(Meta, {down, Pid, _Info}, State0) -> + {State, Effects} = handle_down(Pid, State0), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, enqueuers = Enqs0, @@ -450,16 +444,50 @@ apply(Meta, {nodeup, Node}, #?MODULE{consumers = Cons0, Acc end, {Cons0, SQ0, Monitors}, Cons0), Waiting = update_waiting_consumer_status(Node, State0, up), - State1 = State0#?MODULE{consumers = Cons1, enqueuers = Enqs1, + State1 = State0#?MODULE{consumers = Cons1, + enqueuers = Enqs1, service_queue = SQ, waiting_consumers = Waiting}, {State, Effects} = activate_next_consumer(State1, Effects1), checkout(Meta, State, Effects); apply(_, {nodedown, _Node}, State) -> {State, ok}; +apply(_, #purge_nodes{nodes = Nodes}, State0) -> + {State, Effects} = lists:foldl(fun(Node, {S, E}) -> + purge_node(Node, S, E) + end, {State0, []}, Nodes), + {State, ok, Effects}; apply(Meta, #update_config{config = Conf}, State) -> checkout(Meta, update_config(Conf, State), []). +purge_node(Node, State, Effects) -> + lists:foldl(fun(Pid, {S0, E0}) -> + {S, E} = handle_down(Pid, S0), + {S, E0 ++ E} + end, {State, Effects}, all_pids_for(Node, State)). + +%% any downs that re not noconnection +handle_down(Pid, #?MODULE{consumers = Cons0, + enqueuers = Enqs0} = State0) -> + % Remove any enqueuer for the same pid and enqueue any pending messages + % This should be ok as we won't see any more enqueues from this pid + State1 = case maps:take(Pid, Enqs0) of + {#enqueuer{pending = Pend}, Enqs} -> + lists:foldl(fun ({_, RIdx, RawMsg}, S) -> + enqueue(RIdx, RawMsg, S) + end, State0#?MODULE{enqueuers = Enqs}, Pend); + error -> + State0 + end, + {Effects1, State2} = handle_waiting_consumer_down(Pid, State1), + % return checked out messages to main queue + % Find the consumers for the down pid + DownConsumers = maps:keys( + maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), + lists:foldl(fun(ConsumerId, {S, E}) -> + cancel_consumer(ConsumerId, S, E, down) + end, {State2, Effects1}, DownConsumers). + consumer_active_flag_update_function(#?MODULE{cfg = #cfg{consumer_strategy = competing}}) -> fun(State, ConsumerId, Consumer, Active, ActivityStatus, Effects) -> consumer_update_active_effects(State, ConsumerId, Consumer, Active, @@ -558,8 +586,10 @@ tick(_Ts, #?MODULE{cfg = #cfg{name = Name, query_consumer_count(State), % Consumers EnqueueBytes, CheckoutBytes}, + %% TODO: call a handler that works out if any known nodes need to be + %% purged and emit a command effect to append this to the log [{mod_call, rabbit_quorum_queue, - handle_tick, [QName, Metrics]}, {aux, emit}]. + handle_tick, [QName, Metrics, all_nodes(State)]}, {aux, emit}]. -spec overview(state()) -> map(). overview(#?MODULE{consumers = Cons, @@ -811,9 +841,9 @@ consumer_update_active_effects(#?MODULE{cfg = #cfg{resource = QName}}, | Effects]. cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> - case maps:take(ConsumerId, C0) of - {Consumer, Cons1} -> - {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, + case C0 of + #{ConsumerId := Consumer} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, S0, Effects0, Reason), %% The effects are emitted before the consumer is actually removed %% if the consumer has unacked messages. This is a bit weird but @@ -826,7 +856,7 @@ cancel_consumer0(ConsumerId, #?MODULE{consumers = C0} = S0, Effects0, Reason) -> _ -> {S, Effects} end; - error -> + _ -> %% already removed: do nothing {S0, Effects0} end. @@ -863,9 +893,9 @@ activate_next_consumer(#?MODULE{consumers = Cons, -maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, - Cons1, #?MODULE{consumers = C0, - service_queue = SQ0} = S0, +maybe_return_all(ConsumerId, Consumer, + #?MODULE{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> case Reason of consumer_cancel -> @@ -875,11 +905,12 @@ maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, credit = 0, status = cancelled}, C0, SQ0, Effects0), - {S0#?MODULE{consumers = Cons, service_queue = SQ}, Effects1}; + {S0#?MODULE{consumers = Cons, + service_queue = SQ}, Effects1}; down -> - {S1, Effects1} = return_all(S0, Checked0, Effects0, ConsumerId, - Consumer), - {S1#?MODULE{consumers = Cons1}, Effects1} + {S1, Effects1} = return_all(S0, Effects0, ConsumerId, Consumer), + {S1#?MODULE{consumers = maps:remove(ConsumerId, S1#?MODULE.consumers)}, + Effects1} end. apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> @@ -1009,41 +1040,46 @@ maybe_enqueue(RaftIdx, From, MsgSeqNo, RawMsg, Effects0, snd(T) -> element(2, T). -return(Meta, ConsumerId, MsgNumMsgs, Con0, Checked, - Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0} = State0) -> - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgNumMsgs))}, - {Cons, SQ, Effects1} = update_or_remove_sub(ConsumerId, Con, Cons0, - SQ0, Effects0), - {State1, Effects2} = lists:foldl( - fun({'$prefix_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, - ConsumerId, Con); - ({'$empty_msg', _} = Msg, {S0, E0}) -> - return_one(0, Msg, S0, E0, - ConsumerId, Con); - ({MsgNum, Msg}, {S0, E0}) -> - return_one(MsgNum, Msg, S0, E0, - ConsumerId, Con) - end, {State0, Effects1}, MsgNumMsgs), - checkout(Meta, State1#?MODULE{consumers = Cons, - service_queue = SQ}, - Effects2). +return(Meta, ConsumerId, Returned, + Effects0, #?MODULE{service_queue = SQ0} = State0) -> + {State1, Effects1} = maps:fold( + fun(MsgId, {Tag, _} = Msg, {S0, E0}) when Tag == '$prefix_msg'; + Tag == '$empty_msg'-> + return_one(MsgId, 0, Msg, S0, E0, ConsumerId); + (MsgId, {MsgNum, Msg}, {S0, E0}) -> + return_one(MsgId, MsgNum, Msg, S0, E0, + ConsumerId) + end, {State0, Effects0}, Returned), + #{ConsumerId := Con0} = Cons0 = State1#?MODULE.consumers, + Con = Con0#consumer{credit = increase_credit(Con0, map_size(Returned))}, + {Cons, SQ, Effects2} = update_or_remove_sub(ConsumerId, Con, Cons0, + SQ0, Effects1), + State = State1#?MODULE{consumers = Cons, + service_queue = SQ}, + checkout(Meta, State, Effects2). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, NumDiscarded, - Con0, Checked, Effects0, +complete(ConsumerId, Discarded, + #consumer{checked_out = Checked} = Con0, Effects0, #?MODULE{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> + MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], %% credit_mode = simple_prefetch should automatically top-up credit %% as messages are simple_prefetch or otherwise returned - Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, NumDiscarded)}, + Con = Con0#consumer{checked_out = maps:without(maps:keys(Discarded), Checked), + credit = increase_credit(Con0, maps:size(Discarded))}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), - {State0#?MODULE{consumers = Cons, + State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$prefix_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc); + ({'$empty_msg', Header}, Acc) -> + add_bytes_settle(Header, Acc) + end, State0, maps:values(Discarded)), + {State1#?MODULE{consumers = Cons, ra_indexes = Indexes, service_queue = SQ}, Effects}. @@ -1062,21 +1098,9 @@ increase_credit(#consumer{credit = Current}, Credit) -> complete_and_checkout(#{index := IncomingRaftIdx} = Meta, MsgIds, ConsumerId, #consumer{checked_out = Checked0} = Con0, Effects0, State0) -> - Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), - MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], - State1 = lists:foldl(fun({_, {_, {Header, _}}}, Acc) -> - add_bytes_settle(Header, Acc); - ({'$prefix_msg', Header}, Acc) -> - add_bytes_settle(Header, Acc); - ({'$empty_msg', Header}, Acc) -> - add_bytes_settle(Header, Acc) - end, State0, maps:values(Discarded)), - %% need to pass the length of discarded as $prefix_msgs would be filtered - %% by the above list comprehension - {State2, Effects1} = complete(ConsumerId, MsgRaftIdxs, - maps:size(Discarded), - Con0, Checked, Effects0, State1), + {State2, Effects1} = complete(ConsumerId, Discarded, Con0, + Effects0, State0), {State, ok, Effects} = checkout(Meta, State2, Effects1), % settle metrics are incremented separately update_smallest_raft_index(IncomingRaftIdx, State, Effects). @@ -1136,79 +1160,80 @@ find_next_cursor(Smallest, Cursors0, Potential) -> {Potential, Cursors0} end. -return_one(0, {Tag, Header0}, +return_one(MsgId, 0, {Tag, Header0}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) when Tag == '$prefix_msg'; Tag == '$empty_msg'-> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) when Tag == '$prefix_msg'; Tag == '$empty_msg' -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {Tag, Header}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> - Checked = Con#consumer.checked_out, - {State1, Effects} = complete(ConsumerId, [], 1, Con, Checked, - Effects0, State0), - {add_bytes_settle(Header, State1), Effects}; + complete(ConsumerId, #{MsgId => Msg}, Con0, Effects0, State0); _ -> %% this should not affect the release cursor in any way + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, State1 = case Tag of '$empty_msg' -> State0; _ -> add_in_memory_counts(maps:get(size, Header), State0) end, - {add_bytes_return(Header, - State1#?MODULE{returns = lqueue:in(Msg, Returns)}), + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in(Msg, Returns)}), Effects0} end; -return_one(MsgNum, {RaftId, {Header0, RawMsg}}, +return_one(MsgId, MsgNum, {RaftId, {Header0, RawMsg}}, #?MODULE{returns = Returns, + consumers = Consumers, cfg = #cfg{delivery_limit = DeliveryLimit}} = State0, - Effects0, ConsumerId, Con) -> - Header = maps:update_with(delivery_count, - fun (C) -> C+1 end, + Effects0, ConsumerId) -> + #consumer{checked_out = Checked} = Con0 = maps:get(ConsumerId, Consumers), + Header = maps:update_with(delivery_count, fun (C) -> C+1 end, 1, Header0), Msg = {RaftId, {Header, RawMsg}}, case maps:get(delivery_count, Header) of DeliveryCount when DeliveryCount > DeliveryLimit -> DlMsg = {MsgNum, Msg}, - Effects = dead_letter_effects(delivery_limit, - #{none => DlMsg}, + Effects = dead_letter_effects(delivery_limit, #{none => DlMsg}, State0, Effects0), - Checked = Con#consumer.checked_out, - {State1, Effects1} = complete(ConsumerId, [RaftId], 1, Con, Checked, - Effects, State0), - {add_bytes_settle(Header, State1), Effects1}; + complete(ConsumerId, #{MsgId => DlMsg}, Con0, Effects, State0); _ -> + Con = Con0#consumer{checked_out = maps:remove(MsgId, Checked)}, + %% this should not affect the release cursor in any way State1 = case RawMsg of 'empty' -> State0; _ -> add_in_memory_counts(maps:get(size, Header), State0) end, - %% this should not affect the release cursor in any way - {add_bytes_return(Header, - State1#?MODULE{returns = - lqueue:in({MsgNum, Msg}, Returns)}), + {add_bytes_return( + Header, + State1#?MODULE{consumers = Consumers#{ConsumerId => Con}, + returns = lqueue:in({MsgNum, Msg}, Returns)}), Effects0} end. -return_all(State0, Checked0, Effects0, ConsumerId, Consumer) -> +return_all(#?MODULE{consumers = Cons} = State0, Effects0, ConsumerId, + #consumer{checked_out = Checked0} = Con) -> %% need to sort the list so that we return messages in the order %% they were checked out Checked = lists:sort(maps:to_list(Checked0)), - lists:foldl(fun ({_, {'$prefix_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E, ConsumerId, Consumer); - ({_, {'$empty_msg', _} = Msg}, {S, E}) -> - return_one(0, Msg, S, E, ConsumerId, Consumer); - ({_, {MsgNum, Msg}}, {S, E}) -> - return_one(MsgNum, Msg, S, E, ConsumerId, Consumer) - end, {State0, Effects0}, Checked). + State = State0#?MODULE{consumers = Cons#{ConsumerId => Con}}, + lists:foldl(fun ({MsgId, {'$prefix_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {'$empty_msg', _} = Msg}, {S, E}) -> + return_one(MsgId, 0, Msg, S, E, ConsumerId); + ({MsgId, {MsgNum, Msg}}, {S, E}) -> + return_one(MsgId, MsgNum, Msg, S, E, ConsumerId) + end, {State, Effects0}, Checked). %% checkout new messages to consumers %% reverses the effects list checkout(#{index := Index}, State0, Effects0) -> {State1, _Result, Effects1} = checkout0(checkout_one(State0), Effects0, {#{}, #{}}), - case evaluate_limit(State0#?MODULE.ra_indexes, false, - State1, Effects1) of + case evaluate_limit(false, State1, Effects1) of {State, true, Effects} -> update_smallest_raft_index(Index, State, Effects); {State, false, Effects} -> @@ -1238,17 +1263,16 @@ checkout0({Activity, State0}, Effects0, {SendAcc, LogAcc}) -> end, {State0, ok, lists:reverse(Effects1)}. -evaluate_limit(_OldIndexes, Result, +evaluate_limit(Result, #?MODULE{cfg = #cfg{max_length = undefined, max_bytes = undefined}} = State, Effects) -> {State, Result, Effects}; -evaluate_limit(OldIndexes, Result, - State0, Effects0) -> +evaluate_limit(Result, State0, Effects0) -> case is_over_limit(State0) of true -> {State, Effects} = drop_head(State0, Effects0), - evaluate_limit(OldIndexes, true, State, Effects); + evaluate_limit(true, State, Effects); false -> {State0, Result, Effects0} end. @@ -1596,6 +1620,10 @@ make_credit(ConsumerId, Credit, DeliveryCount, Drain) -> -spec make_purge() -> protocol(). make_purge() -> #purge{}. +-spec make_purge_nodes([node()]) -> protocol(). +make_purge_nodes(Nodes) -> + #purge_nodes{nodes = Nodes}. + -spec make_update_config(config()) -> protocol(). make_update_config(Config) -> #update_config{config = Config}. @@ -1643,6 +1671,39 @@ message_size(Msg) -> %% probably only hit this for testing so ok to use erts_debug erts_debug:size(Msg). +all_nodes(#?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Nodes0 = maps:fold(fun({_, P}, _, Acc) -> + Acc#{node(P) => ok} + end, #{}, Cons0), + Nodes1 = maps:fold(fun(P, _, Acc) -> + Acc#{node(P) => ok} + end, Nodes0, Enqs0), + maps:keys( + lists:foldl(fun({{_, P}, _}, Acc) -> + Acc#{node(P) => ok} + end, Nodes1, WaitingConsumers0)). + +all_pids_for(Node, #?MODULE{consumers = Cons0, + enqueuers = Enqs0, + waiting_consumers = WaitingConsumers0}) -> + Cons = maps:fold(fun({_, P}, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, [], Cons0), + Enqs = maps:fold(fun(P, _, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, _, Acc) -> Acc + end, Cons, Enqs0), + lists:foldl(fun({{_, P}, _}, Acc) + when node(P) =:= Node -> + [P | Acc]; + (_, Acc) -> Acc + end, Enqs, WaitingConsumers0). + suspected_pids_for(Node, #?MODULE{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index 65383b80af..162badc33d 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -945,10 +945,13 @@ with_running_or_clean_mnesia(Fun) -> false -> SavedMnesiaDir = dir(), application:unset_env(mnesia, dir), + SchemaLoc = application:get_env(mnesia, schema_location, opt_disc), + application:set_env(mnesia, schema_location, ram), mnesia:start(), Result = Fun(), application:stop(mnesia), application:set_env(mnesia, dir, SavedMnesiaDir), + application:set_env(mnesia, schema_location, SchemaLoc), Result end. diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 9abbd7cfff..45522b4fa3 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -389,6 +389,7 @@ force_connection_event_refresh(Ref) -> [rabbit_reader:force_event_refresh(C, Ref) || C <- connections()], ok. +-spec failed_to_recv_proxy_header(_, _) -> no_return(). failed_to_recv_proxy_header(Ref, Error) -> Msg = case Error of closed -> "error when receiving proxy header: TCP socket was ~p prematurely"; diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index b11a40cead..8258584e3c 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -28,7 +28,7 @@ -export([cluster_state/1, status/2]). -export([update_consumer_handler/8, update_consumer/9]). -export([cancel_consumer_handler/2, cancel_consumer/3]). --export([become_leader/2, handle_tick/2]). +-export([become_leader/2, handle_tick/3]). -export([rpc_delete_metrics/1]). -export([format/1]). -export([open_files/1]). @@ -249,7 +249,9 @@ rpc_delete_metrics(QName) -> ets:delete(queue_metrics, QName), ok. -handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> +handle_tick(QName, + {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}, + Nodes) -> %% this makes calls to remote processes so cannot be run inside the %% ra server Self = self(), @@ -272,7 +274,21 @@ handle_tick(QName, {Name, MR, MU, M, C, MsgBytesReady, MsgBytesUnack}) -> {messages_ready, MR}, {messages_unacknowledged, MU}, {reductions, R}]), - ok = repair_leader_record(QName, Self) + ok = repair_leader_record(QName, Self), + ExpectedNodes = rabbit_mnesia:cluster_nodes(all), + case Nodes -- ExpectedNodes of + [] -> + ok; + Stale -> + rabbit_log:info("~s: stale nodes detected. Purging ~w~n", + [rabbit_misc:rs(QName), Stale]), + %% pipeline purge command + {ok, Q} = rabbit_amqqueue:lookup(QName), + ok = ra:pipeline_command(amqqueue:get_pid(Q), + rabbit_fifo:make_purge_nodes(Stale)), + + ok + end end), ok. @@ -663,7 +679,8 @@ add_member(VHost, Name, Node) -> true -> case lists:member(Node, QNodes) of true -> - {error, already_a_member}; + %% idempotent by design + ok; false -> add_member(Q, Node) end @@ -709,16 +726,12 @@ delete_member(VHost, Name, Node) -> {error, classic_queue_not_supported}; {ok, Q} when ?amqqueue_is_quorum(Q) -> QNodes = amqqueue:get_quorum_nodes(Q), - case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) of + case lists:member(Node, QNodes) of false -> - {error, node_not_running}; + %% idempotent by design + ok; true -> - case lists:member(Node, QNodes) of - false -> - {error, not_a_member}; - true -> - delete_member(Q, Node) - end + delete_member(Q, Node) end; {error, not_found} = E -> E @@ -733,7 +746,7 @@ delete_member(Q, Node) when ?amqqueue_is_quorum(Q) -> %% deleting the last member is not allowed {error, last_node}; _ -> - case ra:leave_and_delete_server(ServerId) of + case ra:leave_and_delete_server(amqqueue:get_pid(Q), ServerId) of ok -> Fun = fun(Q1) -> amqqueue:set_quorum_nodes( diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 4aea1495a4..aa26cf5482 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -1229,7 +1229,7 @@ handle_method0(#'connection.open'{virtual_host = VHost}, throttle = Throttle}) -> ok = is_over_connection_limit(VHost, User), - ok = rabbit_access_control:check_vhost_access(User, VHost, Sock), + ok = rabbit_access_control:check_vhost_access(User, VHost, {socket, Sock}), ok = is_vhost_alive(VHost, User), NewConnection = Connection#connection{vhost = VHost}, ok = send_on_channel0(Sock, #'connection.open_ok'{}, Protocol), diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 60e9c9dac2..7dfc5fb573 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -16,7 +16,7 @@ -module(rabbit_table). --export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, +-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, wait/2, force_load/0, is_present/0, is_empty/0, needs_default_data/0, check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0, wait_for_replicated/0]). |
