diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-03-15 22:08:55 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-03-15 22:08:55 +0300 |
| commit | 2a26daab5997b9aa41087ea94edf02d510469521 (patch) | |
| tree | f35b53eaff99dd483e074a86e412e2ccfe96dc45 /src | |
| parent | 6dce82e7c5847bfb01c380912b4a9d53d0deb9f7 (diff) | |
| parent | 2f8053da4213e4d4d5df5c2f6dc6871b7054d844 (diff) | |
| download | rabbitmq-server-git-2a26daab5997b9aa41087ea94edf02d510469521.tar.gz | |
Merge branch 'master' into rabbitmq-server-1906
Diffstat (limited to 'src')
| -rw-r--r-- | src/amqqueue.erl | 2 | ||||
| -rw-r--r-- | src/rabbit.erl | 49 | ||||
| -rw-r--r-- | src/rabbit_auth_mechanism_amqplain.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_boot_steps.erl | 17 | ||||
| -rw-r--r-- | src/rabbit_fifo.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_lager.erl | 35 | ||||
| -rw-r--r-- | src/rabbit_vhost.erl | 3 |
7 files changed, 80 insertions, 45 deletions
diff --git a/src/amqqueue.erl b/src/amqqueue.erl index b9c278cdab..d0f3a45aca 100644 --- a/src/amqqueue.erl +++ b/src/amqqueue.erl @@ -431,7 +431,7 @@ set_pid(Queue, Pid) -> % policy --spec get_policy(amqqueue()) -> binary() | none | undefined. +-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined. get_policy(#amqqueue{policy = Policy}) -> Policy; get_policy(Queue) -> amqqueue_v1:get_policy(Queue). diff --git a/src/rabbit.erl b/src/rabbit.erl index 20f9b17abf..003abd141f 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -197,43 +197,52 @@ -rabbit_boot_step({routing_ready, [{description, "message delivery logic ready"}, - {requires, core_initialized}]}). - --rabbit_boot_step({direct_client, - [{description, "direct client"}, - {mfa, {rabbit_direct, boot, []}}, - {requires, routing_ready}]}). + {requires, [core_initialized, recovery]}]}). -rabbit_boot_step({connection_tracking, - [{description, "sets up internal storage for node-local connections"}, + [{description, "connection tracking infrastructure"}, {mfa, {rabbit_connection_tracking, boot, []}}, - {requires, routing_ready}]}). - --rabbit_boot_step({networking, - [{mfa, {rabbit_networking, boot, []}}, - {requires, routing_ready}]}). - --rabbit_boot_step({notify_cluster, - [{description, "notify cluster nodes"}, - {mfa, {rabbit_node_monitor, notify_node_up, []}}, - {requires, networking}]}). + {enables, routing_ready}]}). -rabbit_boot_step({background_gc, [{description, "background garbage collection"}, {mfa, {rabbit_sup, start_restartable_child, [background_gc]}}, - {enables, networking}]}). + {requires, [core_initialized, recovery]}, + {enables, routing_ready}]}). -rabbit_boot_step({rabbit_core_metrics_gc, [{description, "background core metrics garbage collection"}, {mfa, {rabbit_sup, start_restartable_child, [rabbit_core_metrics_gc]}}, - {enables, networking}]}). + {requires, [core_initialized, recovery]}, + {enables, routing_ready}]}). -rabbit_boot_step({rabbit_looking_glass, [{description, "Looking Glass tracer and profiler"}, {mfa, {rabbit_looking_glass, boot, []}}, - {requires, networking}]}). + {requires, [core_initialized, recovery]}, + {enables, routing_ready}]}). + +-rabbit_boot_step({pre_flight, + [{description, "ready to communicate with peers and clients"}, + {requires, [core_initialized, recovery, routing_ready]}]}). + +-rabbit_boot_step({direct_client, + [{description, "direct client"}, + {mfa, {rabbit_direct, boot, []}}, + {requires, pre_flight} + ]}). + +-rabbit_boot_step({notify_cluster, + [{description, "notifies cluster peers of our presence"}, + {mfa, {rabbit_node_monitor, notify_node_up, []}}, + {requires, pre_flight}]}). + +-rabbit_boot_step({networking, + [{description, "TCP and TLS listeners"}, + {mfa, {rabbit_networking, boot, []}}, + {requires, notify_cluster}]}). %%--------------------------------------------------------------------------- diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl index 6ae0512a47..acc5f678e2 100644 --- a/src/rabbit_auth_mechanism_amqplain.erl +++ b/src/rabbit_auth_mechanism_amqplain.erl @@ -41,13 +41,21 @@ should_offer(_Sock) -> init(_Sock) -> []. +-define(IS_STRING_TYPE(Type), Type =:= longstr orelse Type =:= shortstr). + handle_response(Response, _State) -> LoginTable = rabbit_binary_parser:parse_table(Response), case {lists:keysearch(<<"LOGIN">>, 1, LoginTable), lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of - {{value, {_, longstr, User}}, - {value, {_, longstr, Pass}}} -> + {{value, {_, UserType, User}}, + {value, {_, PassType, Pass}}} when ?IS_STRING_TYPE(UserType); + ?IS_STRING_TYPE(PassType) -> rabbit_access_control:check_user_pass_login(User, Pass); + {{value, {_, _UserType, _User}}, + {value, {_, _PassType, _Pass}}} -> + {protocol_error, + "AMQPLAIN auth info ~w uses unsupported type for LOGIN or PASSWORD field", + [LoginTable]}; _ -> {protocol_error, "AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field", diff --git a/src/rabbit_boot_steps.erl b/src/rabbit_boot_steps.erl index 21366877db..24ff878165 100644 --- a/src/rabbit_boot_steps.erl +++ b/src/rabbit_boot_steps.erl @@ -23,7 +23,10 @@ run_boot_steps() -> run_boot_steps(loaded_applications()). run_boot_steps(Apps) -> - [ok = run_step(Attrs, mfa) || {_, _, Attrs} <- find_steps(Apps)], + [begin + rabbit_log:info("Running boot step ~s defined by app ~s", [Step, App]), + ok = run_step(Attrs, mfa) + end || {App, Step, Attrs} <- find_steps(Apps)], ok. run_cleanup_steps(Apps) -> @@ -46,10 +49,14 @@ run_step(Attributes, AttributeName) -> [] -> ok; MFAs -> - [case apply(M,F,A) of - ok -> ok; - {error, Reason} -> exit({error, Reason}) - end || {M,F,A} <- MFAs], + [begin + rabbit_log:debug("Applying MFA: M = ~s, F = ~s, A = ~p", + [M, F, A]), + case apply(M,F,A) of + ok -> ok; + {error, Reason} -> exit({error, Reason}) + end + end || {M,F,A} <- MFAs], ok end. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 97e5f6e901..e551834490 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1408,7 +1408,8 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, dehydrate_state(#?MODULE{messages = Messages, consumers = Consumers, returns = Returns, - prefix_msgs = {PrefRet0, PrefMsg0}} = State) -> + prefix_msgs = {PrefRet0, PrefMsg0}, + waiting_consumers = Waiting0} = State) -> %% TODO: optimise this function as far as possible PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) -> [Header | Acc]; @@ -1422,6 +1423,7 @@ dehydrate_state(#?MODULE{messages = Messages, end, lists:reverse(PrefMsg0), lists:sort(maps:to_list(Messages))), + Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0], State#?MODULE{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), release_cursors = lqueue:new(), @@ -1431,7 +1433,8 @@ dehydrate_state(#?MODULE{messages = Messages, end, Consumers), returns = lqueue:new(), prefix_msgs = {lists:reverse(PrefRet), - lists:reverse(PrefMsgs)}}. + lists:reverse(PrefMsgs)}, + waiting_consumers = Waiting}. dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> Checked = maps:map(fun (_, {'$prefix_msg', _} = M) -> diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl index bfdd26fee3..45f2cfbc40 100644 --- a/src/rabbit_lager.erl +++ b/src/rabbit_lager.erl @@ -63,26 +63,35 @@ set_log_level(Level) -> set_log_level(true, Level) -> SinksAndHandlers = [{Sink, gen_event:which_handlers(Sink)} || Sink <- lager:list_all_sinks()], - set_sink_log_level(SinksAndHandlers, Level); + DefaultHwm = application:get_env(lager, error_logger_hwm_original, 50), + Hwm = case Level of + debug -> DefaultHwm * 100; + _ -> DefaultHwm + end, + application:set_env(lager, error_logger_hwm, Hwm), + set_sink_log_level(SinksAndHandlers, Level, Hwm); set_log_level(_, Level) -> {error, {invalid_log_level, Level}}. -set_sink_log_level([], _Level) -> +set_sink_log_level([], _Level, _Hwm) -> ok; -set_sink_log_level([{Sink, Handlers}|Rest], Level) -> - set_sink_handler_log_level(Sink, Handlers, Level), - set_sink_log_level(Rest, Level). +set_sink_log_level([{Sink, Handlers}|Rest], Level, Hwm) -> + set_sink_handler_log_level(Sink, Handlers, Level, Hwm), + set_sink_log_level(Rest, Level, Hwm). -set_sink_handler_log_level(_Sink, [], _Level) -> +set_sink_handler_log_level(_Sink, [], _Level, _Hwm) -> ok; -set_sink_handler_log_level(Sink, [Handler|Rest], Level) when is_atom(Handler) -> +set_sink_handler_log_level(Sink, [Handler|Rest], Level, Hwm) + when is_atom(Handler) andalso is_integer(Hwm) -> + lager:set_loghwm(Sink, Handler, undefined, Hwm), ok = lager:set_loglevel(Sink, Handler, undefined, Level), - set_sink_handler_log_level(Sink, Rest, Level); -set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level) -> + set_sink_handler_log_level(Sink, Rest, Level, Hwm); +set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level, Hwm) -> + lager:set_loghwm(Sink, Handler, Id, Hwm), ok = lager:set_loglevel(Sink, Handler, Id, Level), - set_sink_handler_log_level(Sink, Rest, Level); -set_sink_handler_log_level(Sink, [_|Rest], Level) -> - set_sink_handler_log_level(Sink, Rest, Level). + set_sink_handler_log_level(Sink, Rest, Level, Hwm); +set_sink_handler_log_level(Sink, [_|Rest], Level, Hwm) -> + set_sink_handler_log_level(Sink, Rest, Level, Hwm). log_locations() -> ensure_lager_configured(), @@ -293,7 +302,7 @@ configure_lager() -> {ok, Val} when is_integer(Val) andalso Val < 1000 -> ok = application:set_env(lager, error_logger_hwm, 1000), ok = application:set_env(lager, error_logger_hwm_original, Val); - {ok, Val} -> + {ok, Val} when is_integer(Val) -> ok = application:set_env(lager, error_logger_hwm_original, Val), ok end, diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index 1721c9b806..6652fb8d5f 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -42,8 +42,7 @@ recover() -> %% So recovery will be run every time a vhost supervisor is restarted. ok = rabbit_vhost_sup_sup:start(), - [ ok = rabbit_vhost_sup_sup:init_vhost(VHost) - || VHost <- rabbit_vhost:list()], + [ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- rabbit_vhost:list()], ok. recover(VHost) -> |
