diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-06-07 19:53:04 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-06-07 19:53:04 +0300 |
| commit | 23ce0c8fa3150a737d6c59f60ed99a72f33d2844 (patch) | |
| tree | 53fbbe009ad4f14ffeebd3f0210127515f0faa04 /src | |
| parent | 7ce71c3f5fd9d13949caa907343b0443762f67bf (diff) | |
| parent | def400e81db176b348e8ffc2574e47d8585e7fb1 (diff) | |
| download | rabbitmq-server-git-23ce0c8fa3150a737d6c59f60ed99a72f33d2844.tar.gz | |
Merge branch 'master' into rabbitmq-server-1767-protocol-specific-ctx-in-authn-authz
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 61 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 13 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 4 | ||||
| -rw-r--r-- | src/rabbit_policies.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_vhost_sup_sup.erl | 18 | ||||
| -rw-r--r-- | src/unconfirmed_messages.erl | 43 |
9 files changed, 107 insertions, 47 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 9e94dd8f27..85c647ae8c 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -761,7 +761,9 @@ check_dlxrk_arg({Type, _}, _Args) -> {error, {unacceptable_type, Type}}. check_overflow({longstr, Val}, _Args) -> - case lists:member(Val, [<<"drop-head">>, <<"reject-publish">>]) of + case lists:member(Val, [<<"drop-head">>, + <<"reject-publish">>, + <<"reject-publish-dlx">>]) of true -> ok; false -> {error, invalid_overflow} end; diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b3f89b7ef0..2185d7c95f 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -82,7 +82,7 @@ %% max length in bytes, if configured max_bytes, %% an action to perform if queue is to be over a limit, - %% can be either drop-head (default) or reject-publish + %% can be either drop-head (default), reject-publish or reject-publish-dlx overflow, %% when policies change, this version helps queue %% determine what previously scheduled/set up state to ignore, @@ -163,7 +163,7 @@ init_state(Q) -> has_had_consumers = false, consumers = rabbit_queue_consumers:new(), senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), + msg_id_to_channel = #{}, status = running, args_policy_version = 0, overflow = 'drop-head', @@ -261,7 +261,7 @@ recovery_barrier(BarrierPid) -> -spec init_with_backing_queue_state (amqqueue:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> + [rabbit_types:delivery()], pmon:pmon(), maps:map()) -> #q{}. init_with_backing_queue_state(Q, BQ, BQS, @@ -599,16 +599,26 @@ confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> - case gb_trees:lookup(MsgId, MTC0) of - {value, {SenderPid, MsgSeqNo}} -> - {rabbit_misc:gb_trees_cons(SenderPid, - MsgSeqNo, CMs), - gb_trees:delete(MsgId, MTC0)}; + case maps:get(MsgId, MTC0, none) of none -> - {CMs, MTC0} + {CMs, MTC0}; + {SenderPid, MsgSeqNo} -> + {maps:update_with(SenderPid, + fun(MsgSeqNos) -> + [MsgSeqNo | MsgSeqNos] + end, + [MsgSeqNo], + CMs), + maps:remove(MsgId, MTC0)} + end - end, {gb_trees:empty(), MTC}, MsgIds), - rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), + end, {#{}, MTC}, MsgIds), + maps:fold( + fun(Pid, MsgSeqNos, _) -> + rabbit_misc:confirm_to_sender(Pid, MsgSeqNos) + end, + ok, + CMs), MTC1. send_or_record_confirm(#delivery{confirm = false}, State) -> @@ -622,7 +632,7 @@ send_or_record_confirm(#delivery{confirm = true, State = #q{q = Q, msg_id_to_channel = MTC}) when ?amqqueue_is_durable(Q) -> - MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + MTC1 = maps:put(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; send_or_record_confirm(#delivery{confirm = true, sender = SenderPid, @@ -704,12 +714,25 @@ maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, Delivered, State = #q{overflow = Overflow, backing_queue = BQ, - backing_queue_state = BQS}) -> + backing_queue_state = BQS, + dlx = DLX, + dlx_routing_key = RK}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); + {true, 'reject-publish-dlx'} -> + %% Publish to DLX + with_dlx( + DLX, + fun (X) -> + QName = qname(State), + rabbit_dead_letter:publish(Message, maxlen, X, RK, QName) + end, + fun () -> ok end), + %% Drop publish and nack to publisher + send_reject_publish(Delivery, Delivered, State); _ -> {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), State1 = State#q{backing_queue_state = BQS1}, @@ -766,6 +789,8 @@ maybe_drop_head(State = #q{max_length = undefined, {false, State}; maybe_drop_head(State = #q{overflow = 'reject-publish'}) -> {false, State}; +maybe_drop_head(State = #q{overflow = 'reject-publish-dlx'}) -> + {false, State}; maybe_drop_head(State = #q{overflow = 'drop-head'}) -> maybe_drop_head(false, State). @@ -786,14 +811,18 @@ maybe_drop_head(AlreadyDropped, State = #q{backing_queue = BQ, end. send_reject_publish(#delivery{confirm = true, - sender = SenderPid, - msg_seq_no = MsgSeqNo} = Delivery, + sender = SenderPid, + flow = Flow, + msg_seq_no = MsgSeqNo, + message = #basic_message{id = MsgId}}, _Delivered, State = #q{ backing_queue = BQ, backing_queue_state = BQS, msg_id_to_channel = MTC}) -> - {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}), + + MTC1 = maps:remove(MsgId, MTC), + BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, _Delivered, State) -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index cf61ae243f..f5c9e8dfce 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -189,6 +189,7 @@ messages_unconfirmed, messages_uncommitted, acks_uncommitted, + pending_raft_commands, prefetch_count, global_prefetch_count, state, @@ -2241,10 +2242,11 @@ confirm(MsgSeqNos, QRef, State = #ch{queue_names = QNames, unconfirmed = UC}) -> %% does not exist in unconfirmed messages. %% Neither does the 'ignore' atom, so it's a reasonable fallback. QName = maps:get(QRef, QNames, ignore), - {MXs, UC1} = + {ConfirmMXs, RejectMXs, UC1} = unconfirmed_messages:confirm_multiple_msg_ref(MsgSeqNos, QName, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. - record_confirms(MXs, State#ch{unconfirmed = UC1}). + State1 = record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}), + record_rejects(RejectMXs, State1). send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; @@ -2371,6 +2373,8 @@ i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; +i(pending_raft_commands, #ch{queue_states = QS}) -> + pending_raft_commands(QS); i(state, #ch{cfg = #conf{state = running}}) -> credit_flow:state(); i(state, #ch{cfg = #conf{state = State}}) -> State; i(prefetch_count, #ch{cfg = #conf{consumer_prefetch = C}}) -> C; @@ -2386,6 +2390,11 @@ i(reductions, _State) -> i(Item, _) -> throw({bad_argument, Item}). +pending_raft_commands(QStates) -> + maps:fold(fun (_, V, Acc) -> + Acc + rabbit_fifo_client:pending_size(V) + end, 0, QStates). + name(#ch{cfg = #conf{conn_name = ConnName, channel = Channel}}) -> list_to_binary(rabbit_misc:format("~s (~p)", [ConnName, Channel])). diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a3c241aff2..136800cc99 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -39,6 +39,7 @@ purge/1, cluster_name/1, update_machine_state/2, + pending_size/1, stat/1 ]). @@ -409,6 +410,10 @@ purge(Node) -> Err end. +-spec pending_size(state()) -> non_neg_integer(). +pending_size(#state{pending = Pend}) -> + maps:size(Pend). + -spec stat(ra_server_id()) -> {ok, non_neg_integer(), non_neg_integer()} | {error | timeout, term()}. diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f7a122f98a..22df1751e5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -717,10 +717,10 @@ promote_me(From, #state { q = Q0, QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 - end, gb_trees:empty(), MS), + end, #{}, MS), Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index e3b23cfbca..5271d503eb 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -1971,7 +1971,7 @@ cleanup_after_file_deletion(File, %%---------------------------------------------------------------------------- -spec combine_files(non_neg_integer(), non_neg_integer(), gc_state()) -> - {ok, deletion_thunk()} | {defer, non_neg_integer()}. + {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. combine_files(Source, Destination, State = #gc_state { file_summary_ets = FileSummaryEts }) -> @@ -2073,7 +2073,7 @@ do_combine_files(SourceSummary, DestinationSummary, gen_server2:cast(Server, {combine_files, Source, Destination, Reclaimed}), safe_file_delete_fun(Source, Dir, FileHandlesEts). --spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, non_neg_integer()}. +-spec delete_file(non_neg_integer(), gc_state()) -> {ok, deletion_thunk()} | {defer, [non_neg_integer()]}. delete_file(File, State = #gc_state { file_summary_ets = FileSummaryEts, file_handles_ets = FileHandlesEts, diff --git a/src/rabbit_policies.erl b/src/rabbit_policies.erl index 7878bed02d..c4f4226448 100644 --- a/src/rabbit_policies.erl +++ b/src/rabbit_policies.erl @@ -131,6 +131,8 @@ validate_policy0(<<"overflow">>, <<"drop-head">>) -> ok; validate_policy0(<<"overflow">>, <<"reject-publish">>) -> ok; +validate_policy0(<<"overflow">>, <<"reject-publish-dlx">>) -> + ok; validate_policy0(<<"overflow">>, Value) -> {error, "~p is not a valid overflow value", [Value]}; diff --git a/src/rabbit_vhost_sup_sup.erl b/src/rabbit_vhost_sup_sup.erl index 46a2b485f2..d9d6091c39 100644 --- a/src/rabbit_vhost_sup_sup.erl +++ b/src/rabbit_vhost_sup_sup.erl @@ -30,6 +30,7 @@ save_vhost_process/2]). -export([delete_on_all_nodes/1, start_on_all_nodes/1]). -export([is_vhost_alive/1]). +-export([check/0]). %% Internal -export([stop_and_delete_vhost/1]). @@ -260,3 +261,20 @@ vhost_restart_strategy() -> transient -> transient; permanent -> permanent end. + +check() -> + VHosts = rabbit_vhost:list(), + lists:filter( + fun(V) -> + case rabbit_vhost_sup_sup:get_vhost_sup(V) of + {ok, Sup} -> + MsgStores = [Pid || {Name, Pid, _, _} <- supervisor:which_children(Sup), + lists:member(Name, [msg_store_persistent, + msg_store_transient])], + not is_vhost_alive(V) orelse (not lists:all(fun(P) -> + erlang:is_process_alive(P) + end, MsgStores)); + {error, _} -> + true + end + end, VHosts). diff --git a/src/unconfirmed_messages.erl b/src/unconfirmed_messages.erl index 63a504a239..0a4b533448 100644 --- a/src/unconfirmed_messages.erl +++ b/src/unconfirmed_messages.erl @@ -33,7 +33,6 @@ -export([new/0, insert/5, - confirm_msg_ref/4, confirm_multiple_msg_ref/4, forget_ref/2, @@ -112,27 +111,22 @@ insert(MsgId, QueueNames, QueueRefs, XName, error({message_already_exists, MsgId, QueueNames, QueueRefs, XName, UC}) end. -%% Confirms a message on behalf of the given queue. If it was the last queue (ref) -%% on the waiting list, returns 'confirmed' and performs the necessary cleanup. --spec confirm_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. -confirm_msg_ref(MsgId, QueueName, QueueRef, - #unconfirmed{reverse = Reverse} = UC) -> - remove_msg_ref(confirm, MsgId, QueueName, QueueRef, - UC#unconfirmed{reverse = remove_from_reverse(QueueRef, [MsgId], Reverse)}). - +%% Confirms messages on behalf of the given queue. If it was the last queue (ref) +%% on the waiting list, returns message id and excahnge name +%% and performs the necessary cleanup. -spec confirm_multiple_msg_ref(msg_id(), queue_name(), queue_ref(), ?MODULE()) -> - {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. + {[{msg_id(), exchange_name()}], [{msg_id(), exchange_name()}], ?MODULE()}. confirm_multiple_msg_ref(MsgIds, QueueName, QueueRef, #unconfirmed{reverse = Reverse} = UC0) -> lists:foldl( - fun(MsgId, {C, UC}) -> + fun(MsgId, {C, R, UC}) -> case remove_msg_ref(confirm, MsgId, QueueName, QueueRef, UC) of - {{confirmed, V}, UC1} -> {[V | C], UC1}; - {not_confirmed, UC1} -> {C, UC1} + {{confirmed, V}, UC1} -> {[V | C], R, UC1}; + {{rejected, V}, UC1} -> {C, [V | R], UC1}; + {not_confirmed, UC1} -> {C, R, UC1} end end, - {[], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, + {[], [], UC0#unconfirmed{reverse = remove_from_reverse(QueueRef, MsgIds, Reverse)}}, MsgIds). %% Removes all messages for a queue. @@ -179,14 +173,15 @@ reject_msg(MsgId, #unconfirmed{ordered = Ordered, index = Index, reverse = Rever {Rejected :: [{msg_id(), exchange_name()}], ?MODULE()}. reject_all_for_queue(QueueRef, #unconfirmed{reverse = Reverse0} = UC0) -> MsgIds = maps:keys(maps:get(QueueRef, Reverse0, #{})), - lists:foldl(fun(MsgId, {R, UC}) -> - case reject_msg(MsgId, UC) of - {not_confirmed, UC1} -> {R, UC1}; - {{rejected, V}, UC1} -> {[V | R], UC1} - end - end, - {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, - MsgIds). + lists:foldl( + fun(MsgId, {R, UC}) -> + case reject_msg(MsgId, UC) of + {not_confirmed, UC1} -> {R, UC1}; + {{rejected, V}, UC1} -> {[V | R], UC1} + end + end, + {[], UC0#unconfirmed{reverse = maps:remove(QueueRef, Reverse0)}}, + MsgIds). %% Returns a smallest message id. -spec smallest(?MODULE()) -> msg_id(). @@ -238,7 +233,7 @@ remove_multiple_from_reverse(Refs, MsgIds, Reverse0) -> Reverse0, Refs). --spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name(), queue_ref(), ?MODULE()) -> +-spec remove_msg_ref(confirm | no_confirm, msg_id(), queue_name() | 'ignore', queue_ref(), ?MODULE()) -> {{confirmed | rejected, {msg_id(), exchange_name()}} | not_confirmed, ?MODULE()}. remove_msg_ref(Confirm, MsgId, QueueName, QueueRef, |
