diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-01-08 23:24:06 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-01-08 23:24:06 +0300 |
| commit | d21db02c75a732839ce90ad2bd587127905b975c (patch) | |
| tree | 1867fa3e0cd232f0fbf1bab5feade00344ee1394 /src | |
| parent | 7908688ce692ad93929bce9da19171add4772a1f (diff) | |
| parent | 1e2a202fcba50dcdce33ca363f93dd06fe78ebf9 (diff) | |
| download | rabbitmq-server-git-d21db02c75a732839ce90ad2bd587127905b975c.tar.gz | |
Merge branch 'master' into rabbitmq-server-1799-single-active-consumer-in-qq
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_amqqueue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_binding.erl | 73 | ||||
| -rw-r--r-- | src/rabbit_channel.erl | 58 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 44 | ||||
| -rw-r--r-- | src/rabbit_fifo_index.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_queue_location_min_masters.erl | 70 |
6 files changed, 156 insertions, 111 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 704ead75a7..c65ad299ed 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -29,7 +29,7 @@ -export([list/0, list/1, info_keys/0, info/1, info/2, info_all/1, info_all/2, emit_info_all/5, list_local/1, info_local/1, emit_info_local/4, emit_info_down/4]). --export([list_down/1, count/1, list_names/0, list_local_names/0]). +-export([list_down/1, count/1, list_names/0, list_names/1, list_local_names/0]). -export([list_by_type/1]). -export([notify_policy_changed/1]). -export([consumers/1, consumers_all/1, emit_consumers_all/4, consumer_info_keys/0]). @@ -437,8 +437,7 @@ internal_declare(Q = #amqqueue{name = QueueName}, false) -> not_found -> Q1 = rabbit_policy:set(Q), Q2 = Q1#amqqueue{state = live}, ok = store_queue(Q2), - B = add_default_binding(Q2), - fun () -> B(), {created, Q2} end; + fun () -> {created, Q2} end; {absent, _Q, _} = R -> rabbit_misc:const(R) end; [ExistingQ] -> @@ -502,15 +501,6 @@ policy_changed(Q1 = #amqqueue{decorators = Decorators1}, %% mirroring-related has changed - the policy may have changed anyway. notify_policy_changed(Q1). -add_default_binding(#amqqueue{name = QueueName}) -> - ExchangeName = rabbit_misc:r(QueueName, exchange, <<>>), - RoutingKey = QueueName#resource.name, - rabbit_binding:add(#binding{source = ExchangeName, - destination = QueueName, - key = RoutingKey, - args = []}, - ?INTERNAL_USER). - lookup([]) -> []; %% optimisation lookup([Name]) -> ets:lookup(rabbit_queue, Name); %% optimisation lookup(Names) when is_list(Names) -> @@ -764,6 +754,8 @@ list() -> mnesia:dirty_match_object(rabbit_queue, #amqqueue{_ = '_'}). list_names() -> mnesia:dirty_all_keys(rabbit_queue). +list_names(VHost) -> [Q#amqqueue.name || Q <- list(VHost)]. + list_local_names() -> [ Q#amqqueue.name || #amqqueue{state = State, pid = QPid} = Q <- list(), State =/= crashed, is_local_to_node(QPid, node())]. diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl index e96dfd7673..258e85ffa2 100644 --- a/src/rabbit_binding.erl +++ b/src/rabbit_binding.erl @@ -27,6 +27,10 @@ -export([has_for_source/1, remove_for_source/1, remove_for_destination/2, remove_transient_for_destination/1]). +-define(DEFAULT_EXCHANGE(VHostPath), #resource{virtual_host = VHostPath, + kind = exchange, + name = <<>>}). + %%---------------------------------------------------------------------------- -export_type([key/0, deletions/0]). @@ -156,6 +160,14 @@ recover_semi_durable_route_txn(R = #route{binding = B}, X) -> (Serial, false) -> x_callback(Serial, X, add_binding, B) end). +exists(#binding{source = ?DEFAULT_EXCHANGE(_), + destination = #resource{kind = queue, name = QName} = Queue, + key = QName, + args = []}) -> + case rabbit_amqqueue:lookup(Queue) of + {ok, _} -> true; + {error, not_found} -> false + end; exists(Binding) -> binding_action( Binding, fun (_Src, _Dst, B) -> @@ -243,9 +255,17 @@ list(VHostPath) -> destination = VHostResource, _ = '_'}, _ = '_'}, - [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, - Route)]. - + %% if there are any default exchange bindings left after an upgrade + %% of a pre-3.8 database, filter them out + AllBindings = [B || #route{binding = B} <- mnesia:dirty_match_object(rabbit_route, + Route)], + Filtered = lists:filter(fun(#binding{source = S}) -> + S =/= ?DEFAULT_EXCHANGE(VHostPath) + end, AllBindings), + implicit_bindings(VHostPath) ++ Filtered. + +list_for_source(?DEFAULT_EXCHANGE(VHostPath)) -> + implicit_bindings(VHostPath); list_for_source(SrcName) -> mnesia:async_dirty( fun() -> @@ -255,16 +275,43 @@ list_for_source(SrcName) -> end). list_for_destination(DstName) -> - mnesia:async_dirty( - fun() -> - Route = #route{binding = #binding{destination = DstName, - _ = '_'}}, - [reverse_binding(B) || - #reverse_route{reverse_binding = B} <- - mnesia:match_object(rabbit_reverse_route, - reverse_route(Route), read)] - end). - + implicit_for_destination(DstName) ++ + mnesia:async_dirty( + fun() -> + Route = #route{binding = #binding{destination = DstName, + _ = '_'}}, + [reverse_binding(B) || + #reverse_route{reverse_binding = B} <- + mnesia:match_object(rabbit_reverse_route, + reverse_route(Route), read)] + end). + +implicit_bindings(VHostPath) -> + DstQueues = rabbit_amqqueue:list_names(VHostPath), + [ #binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []} + || DstQueue = #resource{name = QName} <- DstQueues ]. + +implicit_for_destination(DstQueue = #resource{kind = queue, + virtual_host = VHostPath, + name = QName}) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; +implicit_for_destination(_) -> + []. + +list_for_source_and_destination(?DEFAULT_EXCHANGE(VHostPath), + #resource{kind = queue, + virtual_host = VHostPath, + name = QName} = DstQueue) -> + [#binding{source = ?DEFAULT_EXCHANGE(VHostPath), + destination = DstQueue, + key = QName, + args = []}]; list_for_source_and_destination(SrcName, DstName) -> mnesia:async_dirty( fun() -> diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d1f3b06528..eeae247193 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -72,7 +72,7 @@ -export([get_vhost/1, get_user/1]). %% For testing -export([build_topic_variable_map/3]). --export([list_queue_states/1]). +-export([list_queue_states/1, get_max_message_size/0]). %% Mgmt HTTP API refactor -export([handle_method/5]). @@ -158,7 +158,9 @@ delivery_flow, interceptor_state, queue_states, - queue_cleanup_timer + queue_cleanup_timer, + %% Message content size limit + max_message_size }). -define(QUEUE, lqueue). @@ -441,6 +443,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, _ -> Limiter0 end, + MaxMessageSize = get_max_message_size(), State = #ch{state = starting, protocol = Protocol, channel = Channel, @@ -473,7 +476,8 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, reply_consumer = none, delivery_flow = Flow, interceptor_state = undefined, - queue_states = #{}}, + queue_states = #{}, + max_message_size = MaxMessageSize}, State1 = State#ch{ interceptor_state = rabbit_channel_interceptor:init(State)}, State2 = rabbit_event:init_stats_timer(State1, #ch.stats_timer), @@ -793,6 +797,16 @@ code_change(_OldVsn, State, _Extra) -> format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ). +-spec get_max_message_size() -> non_neg_integer(). + +get_max_message_size() -> + case application:get_env(rabbit, max_message_size) of + {ok, MS} when is_integer(MS) -> + erlang:min(MS, ?MAX_MSG_SIZE); + _ -> + ?MAX_MSG_SIZE + end. + %%--------------------------------------------------------------------------- reply(Reply, NewState) -> {reply, Reply, next_state(NewState), hibernate}. @@ -985,12 +999,19 @@ extract_topic_variable_map_from_amqp_params([{amqp_params, {amqp_params_direct, extract_topic_variable_map_from_amqp_params(_) -> #{}. -check_msg_size(Content) -> +check_msg_size(Content, MaxMessageSize) -> Size = rabbit_basic:maybe_gc_large_msg(Content), - case Size > ?MAX_MSG_SIZE of - true -> precondition_failed("message size ~B larger than max size ~B", - [Size, ?MAX_MSG_SIZE]); - false -> ok + case Size of + S when S > MaxMessageSize -> + ErrorMessage = case MaxMessageSize of + ?MAX_MSG_SIZE -> + "message size ~B is larger than max size ~B"; + _ -> + "message size ~B is larger than configured max size ~B" + end, + precondition_failed(ErrorMessage, + [Size, MaxMessageSize]); + _ -> ok end. check_vhost_queue_limit(#resource{name = QueueName}, VHost) -> @@ -1164,16 +1185,17 @@ handle_method(#'basic.publish'{immediate = true}, _Content, _State) -> handle_method(#'basic.publish'{exchange = ExchangeNameBin, routing_key = RoutingKey, mandatory = Mandatory}, - Content, State = #ch{virtual_host = VHostPath, - tx = Tx, - channel = ChannelNum, - confirm_enabled = ConfirmEnabled, - trace_state = TraceState, - user = #user{username = Username} = User, - conn_name = ConnName, - delivery_flow = Flow, - conn_pid = ConnPid}) -> - check_msg_size(Content), + Content, State = #ch{virtual_host = VHostPath, + tx = Tx, + channel = ChannelNum, + confirm_enabled = ConfirmEnabled, + trace_state = TraceState, + user = #user{username = Username} = User, + conn_name = ConnName, + delivery_flow = Flow, + conn_pid = ConnPid, + max_message_size = MaxMessageSize}) -> + check_msg_size(Content, MaxMessageSize), ExchangeName = rabbit_misc:r(VHostPath, exchange, ExchangeNameBin), check_write_permitted(ExchangeName, User), Exchange = rabbit_exchange:lookup_or_die(ExchangeName), diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 938e8f77fd..1536cd1f51 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -428,26 +428,20 @@ apply(_, #checkout{spec = Spec, meta = Meta, State1 = update_consumer(ConsumerId, Meta, Spec, State0), checkout(State1, [{monitor, process, Pid}]); apply(#{index := RaftIdx}, #purge{}, - #state{consumers = Cons0, ra_indexes = Indexes } = State0) -> - Total = rabbit_fifo_index:size(Indexes), - {State1, Effects1} = - maps:fold( - fun(ConsumerId, C = #consumer{checked_out = Checked0}, - {StateAcc0, EffectsAcc0}) -> - MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} - <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, - #{}, EffectsAcc0, StateAcc0) - end, {State0, []}, Cons0), - {State, _, Effects} = - update_smallest_raft_index( - RaftIdx, Indexes, - State1#state{ra_indexes = rabbit_fifo_index:empty(), - messages = #{}, - returns = lqueue:new(), - msg_bytes_enqueue = 0, - msg_bytes_checkout = 0, - low_msg_num = undefined}, Effects1), + #state{ra_indexes = Indexes0, + messages = Messages} = State0) -> + Total = maps:size(Messages), + Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, + Indexes0, + [I || {I, _} <- lists:sort(maps:values(Messages))]), + {State, _, Effects} = + update_smallest_raft_index(RaftIdx, Indexes0, + State0#state{ra_indexes = Indexes, + messages = #{}, + returns = lqueue:new(), + msg_bytes_enqueue = 0, + low_msg_num = undefined}, + []), %% as we're not checking out after a purge (no point) we have to %% reverse the effects ourselves {State, {purge, Total}, @@ -554,7 +548,8 @@ state_enter(leader, #state{consumers = Cons, Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), Mons = [{monitor, process, P} || P <- Pids], Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], - Effects = Mons ++ Nots, + NodeMons = lists:usort([{monitor, node, node(P)} || P <- Pids]), + Effects = Mons ++ Nots ++ NodeMons, case BLH of undefined -> Effects; @@ -1940,11 +1935,12 @@ purge_with_checkout_test() -> %% assert message bytes are non zero ?assert(State2#state.msg_bytes_checkout > 0), ?assert(State2#state.msg_bytes_enqueue > 0), - {State3, {purge, 2}, _} = apply(meta(2), make_purge(), State2), - ?assertEqual(0, State3#state.msg_bytes_checkout), + {State3, {purge, 1}, _} = apply(meta(2), make_purge(), State2), + ?assert(State2#state.msg_bytes_checkout > 0), ?assertEqual(0, State3#state.msg_bytes_enqueue), + ?assertEqual(1, rabbit_fifo_index:size(State3#state.ra_indexes)), #consumer{checked_out = Checked} = maps:get(Cid, State3#state.consumers), - ?assertEqual(0, maps:size(Checked)), + ?assertEqual(1, maps:size(Checked)), ok. down_returns_checked_out_in_order_test() -> diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 345a99a03c..f8f414f453 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -56,10 +56,10 @@ return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> State#?MODULE{data = maps:put(Key, Value, Data)}. --spec delete(integer(), state()) -> state(). +-spec delete(Index :: integer(), state()) -> state(). delete(Smallest, #?MODULE{data = Data0, - largest = Largest, - smallest = Smallest} = State) -> + largest = Largest, + smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> diff --git a/src/rabbit_queue_location_min_masters.erl b/src/rabbit_queue_location_min_masters.erl index c532714d41..7c386855f6 100644 --- a/src/rabbit_queue_location_min_masters.erl +++ b/src/rabbit_queue_location_min_masters.erl @@ -38,45 +38,33 @@ description() -> <<"Locate queue master node from cluster node with least bound queues">>}]. queue_master_location(#amqqueue{} = Q) -> - Cluster = rabbit_queue_master_location_misc:all_nodes(Q), - VHosts = rabbit_vhost:list(), - BoundQueueMasters = get_bound_queue_masters_per_vhost(VHosts, []), - {_Count, MinMaster}= get_min_master(Cluster, BoundQueueMasters), - {ok, MinMaster}. + Cluster = rabbit_queue_master_location_misc:all_nodes(Q), + QueueNames = rabbit_amqqueue:list_names(), + MastersPerNode = lists:foldl( + fun(#resource{virtual_host = VHost, name = QueueName}, NodeMasters) -> + case rabbit_queue_master_location_misc:lookup_master(QueueName, VHost) of + {ok, Master} when is_atom(Master) -> + case maps:is_key(Master, NodeMasters) of + true -> maps:update_with(Master, + fun(N) -> N + 1 end, + NodeMasters); + false -> NodeMasters + end; + _ -> NodeMasters + end + end, + maps:from_list([{N, 0} || N <- Cluster]), + QueueNames), -%%--------------------------------------------------------------------------- -%% Private helper functions -%%--------------------------------------------------------------------------- -get_min_master(Cluster, BoundQueueMasters) -> - lists:min([ {count_masters(Node, BoundQueueMasters), Node} || - Node <- Cluster ]). - -count_masters(Node, Masters) -> - length([ X || X <- Masters, X == Node ]). - -get_bound_queue_masters_per_vhost([], Acc) -> - lists:flatten(Acc); -get_bound_queue_masters_per_vhost([VHost|RemVHosts], Acc) -> - BoundQueueNames = - lists:filtermap( - fun(#binding{destination =#resource{kind = queue, - name = QueueName}}) -> - {true, QueueName}; - (_) -> - false - end, - rabbit_binding:list(VHost)), - UniqQueueNames = lists:usort(BoundQueueNames), - BoundQueueMasters = get_queue_masters(VHost, UniqQueueNames, []), - get_bound_queue_masters_per_vhost(RemVHosts, [BoundQueueMasters|Acc]). - - -get_queue_masters(_VHost, [], BoundQueueNodes) -> BoundQueueNodes; -get_queue_masters(VHost, [QueueName | RemQueueNames], QueueMastersAcc) -> - QueueMastersAcc0 = case rabbit_queue_master_location_misc:lookup_master( - QueueName, VHost) of - {ok, Master} when is_atom(Master) -> - [Master|QueueMastersAcc]; - _ -> QueueMastersAcc - end, - get_queue_masters(VHost, RemQueueNames, QueueMastersAcc0). + {MinNode, _NMasters} = maps:fold( + fun(Node, NMasters, init) -> + {Node, NMasters}; + (Node, NMasters, {MinNode, MinMasters}) -> + case NMasters < MinMasters of + true -> {Node, NMasters}; + false -> {MinNode, MinMasters} + end + end, + init, + MastersPerNode), + {ok, MinNode}.
\ No newline at end of file |
