summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/lager_exchange_backend.erl25
-rw-r--r--src/rabbit.erl6
2 files changed, 26 insertions, 5 deletions
diff --git a/src/lager_exchange_backend.erl b/src/lager_exchange_backend.erl
index 9eec237f8f..6a226bcb72 100644
--- a/src/lager_exchange_backend.erl
+++ b/src/lager_exchange_backend.erl
@@ -28,8 +28,10 @@
-behaviour(gen_event).
--export([init/1, terminate/2, code_change/3, handle_call/2, handle_event/2,
- handle_info/2]).
+-export([init/1, terminate/2, code_change/3,
+ handle_call/2, handle_event/2, handle_info/2]).
+
+-export([maybe_init_exchange/0]).
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
@@ -82,6 +84,7 @@ init(Options) when is_list(Options) ->
State0 = #state{level=L,
formatter=Formatter,
format_config=Config},
+ % NB: this will probably always fail since the / vhost isn't available
State1 = maybe_init_exchange(State0),
{ok, State1}
catch
@@ -94,6 +97,18 @@ init(Level) when is_atom(Level) ->
init(Other) ->
{error, {fatal, {bad_lager_exchange_backend_config, Other}}}.
+% rabbitmq/rabbitmq-server#1973
+% This is called immediatly after the / vhost is created
+% or recovered
+maybe_init_exchange() ->
+ case lists:member(?MODULE, gen_event:which_handlers(lager_event)) of
+ true ->
+ _ = init_exchange(true),
+ ok;
+ _ ->
+ ok
+ end.
+
validate_options([]) -> true;
validate_options([{level, L}|T]) when is_atom(L) ->
case lists:member(L, ?LEVELS) of
@@ -179,7 +194,10 @@ maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=undefined} = Sta
handle_init_exchange(init_exchange(true), Now, State);
maybe_init_exchange(#state{exchange=undefined, init_exchange_ts=Timestamp} = State) ->
Now = erlang:monotonic_time(second),
- Result = init_exchange(Now - Timestamp > ?INIT_EXCHANGE_INTERVAL_SECS),
+ % NB: since we may try to declare the exchange on every log message, this ensures
+ % that we only try once every 5 seconds
+ HasEnoughTimeElapsed = Now - Timestamp > ?INIT_EXCHANGE_INTERVAL_SECS,
+ Result = init_exchange(HasEnoughTimeElapsed),
handle_init_exchange(Result, Now, State);
maybe_init_exchange(State) ->
State.
@@ -191,6 +209,7 @@ init_exchange(true) ->
try
%% durable
#exchange{} = rabbit_exchange:declare(Exchange, topic, true, false, true, [], ?INTERNAL_USER),
+ rabbit_log:info("Declared exchange '~s' in vhost '~s'", [?LOG_EXCH_NAME, DefaultVHost]),
{ok, Exchange}
catch
ErrType:Err ->
diff --git a/src/rabbit.erl b/src/rabbit.erl
index dafdaed5c9..1210e467ec 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -1031,8 +1031,9 @@ boot_delegate() ->
-spec recover() -> 'ok'.
recover() ->
- rabbit_policy:recover(),
- rabbit_vhost:recover().
+ ok = rabbit_policy:recover(),
+ ok = rabbit_vhost:recover(),
+ ok = lager_exchange_backend:maybe_init_exchange().
-spec maybe_insert_default_data() -> 'ok'.
@@ -1058,6 +1059,7 @@ insert_default_data() ->
DefaultReadPermBin = rabbit_data_coercion:to_binary(DefaultReadPerm),
ok = rabbit_vhost:add(DefaultVHostBin, ?INTERNAL_USER),
+ ok = lager_exchange_backend:maybe_init_exchange(),
ok = rabbit_auth_backend_internal:add_user(
DefaultUserBin,
DefaultPassBin,