summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2019-03-15 22:08:55 +0300
committerMichael Klishin <michael@clojurewerkz.org>2019-03-15 22:08:55 +0300
commit2a26daab5997b9aa41087ea94edf02d510469521 (patch)
treef35b53eaff99dd483e074a86e412e2ccfe96dc45 /src
parent6dce82e7c5847bfb01c380912b4a9d53d0deb9f7 (diff)
parent2f8053da4213e4d4d5df5c2f6dc6871b7054d844 (diff)
downloadrabbitmq-server-git-2a26daab5997b9aa41087ea94edf02d510469521.tar.gz
Merge branch 'master' into rabbitmq-server-1906
Diffstat (limited to 'src')
-rw-r--r--src/amqqueue.erl2
-rw-r--r--src/rabbit.erl49
-rw-r--r--src/rabbit_auth_mechanism_amqplain.erl12
-rw-r--r--src/rabbit_boot_steps.erl17
-rw-r--r--src/rabbit_fifo.erl7
-rw-r--r--src/rabbit_lager.erl35
-rw-r--r--src/rabbit_vhost.erl3
7 files changed, 80 insertions, 45 deletions
diff --git a/src/amqqueue.erl b/src/amqqueue.erl
index b9c278cdab..d0f3a45aca 100644
--- a/src/amqqueue.erl
+++ b/src/amqqueue.erl
@@ -431,7 +431,7 @@ set_pid(Queue, Pid) ->
% policy
--spec get_policy(amqqueue()) -> binary() | none | undefined.
+-spec get_policy(amqqueue()) -> proplists:proplist() | none | undefined.
get_policy(#amqqueue{policy = Policy}) -> Policy;
get_policy(Queue) -> amqqueue_v1:get_policy(Queue).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 20f9b17abf..003abd141f 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -197,43 +197,52 @@
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"},
- {requires, core_initialized}]}).
-
--rabbit_boot_step({direct_client,
- [{description, "direct client"},
- {mfa, {rabbit_direct, boot, []}},
- {requires, routing_ready}]}).
+ {requires, [core_initialized, recovery]}]}).
-rabbit_boot_step({connection_tracking,
- [{description, "sets up internal storage for node-local connections"},
+ [{description, "connection tracking infrastructure"},
{mfa, {rabbit_connection_tracking, boot, []}},
- {requires, routing_ready}]}).
-
--rabbit_boot_step({networking,
- [{mfa, {rabbit_networking, boot, []}},
- {requires, routing_ready}]}).
-
--rabbit_boot_step({notify_cluster,
- [{description, "notify cluster nodes"},
- {mfa, {rabbit_node_monitor, notify_node_up, []}},
- {requires, networking}]}).
+ {enables, routing_ready}]}).
-rabbit_boot_step({background_gc,
[{description, "background garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
[background_gc]}},
- {enables, networking}]}).
+ {requires, [core_initialized, recovery]},
+ {enables, routing_ready}]}).
-rabbit_boot_step({rabbit_core_metrics_gc,
[{description, "background core metrics garbage collection"},
{mfa, {rabbit_sup, start_restartable_child,
[rabbit_core_metrics_gc]}},
- {enables, networking}]}).
+ {requires, [core_initialized, recovery]},
+ {enables, routing_ready}]}).
-rabbit_boot_step({rabbit_looking_glass,
[{description, "Looking Glass tracer and profiler"},
{mfa, {rabbit_looking_glass, boot, []}},
- {requires, networking}]}).
+ {requires, [core_initialized, recovery]},
+ {enables, routing_ready}]}).
+
+-rabbit_boot_step({pre_flight,
+ [{description, "ready to communicate with peers and clients"},
+ {requires, [core_initialized, recovery, routing_ready]}]}).
+
+-rabbit_boot_step({direct_client,
+ [{description, "direct client"},
+ {mfa, {rabbit_direct, boot, []}},
+ {requires, pre_flight}
+ ]}).
+
+-rabbit_boot_step({notify_cluster,
+ [{description, "notifies cluster peers of our presence"},
+ {mfa, {rabbit_node_monitor, notify_node_up, []}},
+ {requires, pre_flight}]}).
+
+-rabbit_boot_step({networking,
+ [{description, "TCP and TLS listeners"},
+ {mfa, {rabbit_networking, boot, []}},
+ {requires, notify_cluster}]}).
%%---------------------------------------------------------------------------
diff --git a/src/rabbit_auth_mechanism_amqplain.erl b/src/rabbit_auth_mechanism_amqplain.erl
index 6ae0512a47..acc5f678e2 100644
--- a/src/rabbit_auth_mechanism_amqplain.erl
+++ b/src/rabbit_auth_mechanism_amqplain.erl
@@ -41,13 +41,21 @@ should_offer(_Sock) ->
init(_Sock) ->
[].
+-define(IS_STRING_TYPE(Type), Type =:= longstr orelse Type =:= shortstr).
+
handle_response(Response, _State) ->
LoginTable = rabbit_binary_parser:parse_table(Response),
case {lists:keysearch(<<"LOGIN">>, 1, LoginTable),
lists:keysearch(<<"PASSWORD">>, 1, LoginTable)} of
- {{value, {_, longstr, User}},
- {value, {_, longstr, Pass}}} ->
+ {{value, {_, UserType, User}},
+ {value, {_, PassType, Pass}}} when ?IS_STRING_TYPE(UserType);
+ ?IS_STRING_TYPE(PassType) ->
rabbit_access_control:check_user_pass_login(User, Pass);
+ {{value, {_, _UserType, _User}},
+ {value, {_, _PassType, _Pass}}} ->
+ {protocol_error,
+ "AMQPLAIN auth info ~w uses unsupported type for LOGIN or PASSWORD field",
+ [LoginTable]};
_ ->
{protocol_error,
"AMQPLAIN auth info ~w is missing LOGIN or PASSWORD field",
diff --git a/src/rabbit_boot_steps.erl b/src/rabbit_boot_steps.erl
index 21366877db..24ff878165 100644
--- a/src/rabbit_boot_steps.erl
+++ b/src/rabbit_boot_steps.erl
@@ -23,7 +23,10 @@ run_boot_steps() ->
run_boot_steps(loaded_applications()).
run_boot_steps(Apps) ->
- [ok = run_step(Attrs, mfa) || {_, _, Attrs} <- find_steps(Apps)],
+ [begin
+ rabbit_log:info("Running boot step ~s defined by app ~s", [Step, App]),
+ ok = run_step(Attrs, mfa)
+ end || {App, Step, Attrs} <- find_steps(Apps)],
ok.
run_cleanup_steps(Apps) ->
@@ -46,10 +49,14 @@ run_step(Attributes, AttributeName) ->
[] ->
ok;
MFAs ->
- [case apply(M,F,A) of
- ok -> ok;
- {error, Reason} -> exit({error, Reason})
- end || {M,F,A} <- MFAs],
+ [begin
+ rabbit_log:debug("Applying MFA: M = ~s, F = ~s, A = ~p",
+ [M, F, A]),
+ case apply(M,F,A) of
+ ok -> ok;
+ {error, Reason} -> exit({error, Reason})
+ end
+ end || {M,F,A} <- MFAs],
ok
end.
diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl
index 97e5f6e901..e551834490 100644
--- a/src/rabbit_fifo.erl
+++ b/src/rabbit_fifo.erl
@@ -1408,7 +1408,8 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit},
dehydrate_state(#?MODULE{messages = Messages,
consumers = Consumers,
returns = Returns,
- prefix_msgs = {PrefRet0, PrefMsg0}} = State) ->
+ prefix_msgs = {PrefRet0, PrefMsg0},
+ waiting_consumers = Waiting0} = State) ->
%% TODO: optimise this function as far as possible
PrefRet = lists:foldl(fun ({'$prefix_msg', Header}, Acc) ->
[Header | Acc];
@@ -1422,6 +1423,7 @@ dehydrate_state(#?MODULE{messages = Messages,
end,
lists:reverse(PrefMsg0),
lists:sort(maps:to_list(Messages))),
+ Waiting = [{Cid, dehydrate_consumer(C)} || {Cid, C} <- Waiting0],
State#?MODULE{messages = #{},
ra_indexes = rabbit_fifo_index:empty(),
release_cursors = lqueue:new(),
@@ -1431,7 +1433,8 @@ dehydrate_state(#?MODULE{messages = Messages,
end, Consumers),
returns = lqueue:new(),
prefix_msgs = {lists:reverse(PrefRet),
- lists:reverse(PrefMsgs)}}.
+ lists:reverse(PrefMsgs)},
+ waiting_consumers = Waiting}.
dehydrate_consumer(#consumer{checked_out = Checked0} = Con) ->
Checked = maps:map(fun (_, {'$prefix_msg', _} = M) ->
diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl
index bfdd26fee3..45f2cfbc40 100644
--- a/src/rabbit_lager.erl
+++ b/src/rabbit_lager.erl
@@ -63,26 +63,35 @@ set_log_level(Level) ->
set_log_level(true, Level) ->
SinksAndHandlers = [{Sink, gen_event:which_handlers(Sink)} ||
Sink <- lager:list_all_sinks()],
- set_sink_log_level(SinksAndHandlers, Level);
+ DefaultHwm = application:get_env(lager, error_logger_hwm_original, 50),
+ Hwm = case Level of
+ debug -> DefaultHwm * 100;
+ _ -> DefaultHwm
+ end,
+ application:set_env(lager, error_logger_hwm, Hwm),
+ set_sink_log_level(SinksAndHandlers, Level, Hwm);
set_log_level(_, Level) ->
{error, {invalid_log_level, Level}}.
-set_sink_log_level([], _Level) ->
+set_sink_log_level([], _Level, _Hwm) ->
ok;
-set_sink_log_level([{Sink, Handlers}|Rest], Level) ->
- set_sink_handler_log_level(Sink, Handlers, Level),
- set_sink_log_level(Rest, Level).
+set_sink_log_level([{Sink, Handlers}|Rest], Level, Hwm) ->
+ set_sink_handler_log_level(Sink, Handlers, Level, Hwm),
+ set_sink_log_level(Rest, Level, Hwm).
-set_sink_handler_log_level(_Sink, [], _Level) ->
+set_sink_handler_log_level(_Sink, [], _Level, _Hwm) ->
ok;
-set_sink_handler_log_level(Sink, [Handler|Rest], Level) when is_atom(Handler) ->
+set_sink_handler_log_level(Sink, [Handler|Rest], Level, Hwm)
+ when is_atom(Handler) andalso is_integer(Hwm) ->
+ lager:set_loghwm(Sink, Handler, undefined, Hwm),
ok = lager:set_loglevel(Sink, Handler, undefined, Level),
- set_sink_handler_log_level(Sink, Rest, Level);
-set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level) ->
+ set_sink_handler_log_level(Sink, Rest, Level, Hwm);
+set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level, Hwm) ->
+ lager:set_loghwm(Sink, Handler, Id, Hwm),
ok = lager:set_loglevel(Sink, Handler, Id, Level),
- set_sink_handler_log_level(Sink, Rest, Level);
-set_sink_handler_log_level(Sink, [_|Rest], Level) ->
- set_sink_handler_log_level(Sink, Rest, Level).
+ set_sink_handler_log_level(Sink, Rest, Level, Hwm);
+set_sink_handler_log_level(Sink, [_|Rest], Level, Hwm) ->
+ set_sink_handler_log_level(Sink, Rest, Level, Hwm).
log_locations() ->
ensure_lager_configured(),
@@ -293,7 +302,7 @@ configure_lager() ->
{ok, Val} when is_integer(Val) andalso Val < 1000 ->
ok = application:set_env(lager, error_logger_hwm, 1000),
ok = application:set_env(lager, error_logger_hwm_original, Val);
- {ok, Val} ->
+ {ok, Val} when is_integer(Val) ->
ok = application:set_env(lager, error_logger_hwm_original, Val),
ok
end,
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 1721c9b806..6652fb8d5f 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -42,8 +42,7 @@ recover() ->
%% So recovery will be run every time a vhost supervisor is restarted.
ok = rabbit_vhost_sup_sup:start(),
- [ ok = rabbit_vhost_sup_sup:init_vhost(VHost)
- || VHost <- rabbit_vhost:list()],
+ [ok = rabbit_vhost_sup_sup:init_vhost(VHost) || VHost <- rabbit_vhost:list()],
ok.
recover(VHost) ->