summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon MacMullen <simon@rabbitmq.com>2011-09-28 12:18:50 +0100
committerSimon MacMullen <simon@rabbitmq.com>2011-09-28 12:18:50 +0100
commit2ecc9baddaa05ee3efd39d76a024db94b4c393ba (patch)
treeb289399ee537d630b837215be062bfdb7b079a9e
parent9fe249063c21f378ac6cb38027fd8dd62b901721 (diff)
parent39496a12d997af16f5ba7928cbc5933105ed57c3 (diff)
downloadrabbitmq-server-git-2ecc9baddaa05ee3efd39d76a024db94b4c393ba.tar.gz
Merge bug24371
-rw-r--r--docs/html-to-website-xml.xsl2
-rw-r--r--docs/rabbitmqctl.1.xml33
-rwxr-xr-xscripts/rabbitmq-server10
-rw-r--r--scripts/rabbitmq-server.bat14
-rw-r--r--scripts/rabbitmq-service.bat14
-rw-r--r--src/file_handle_cache.erl114
-rw-r--r--src/rabbit.erl21
-rw-r--r--src/rabbit_amqqueue_process.erl224
-rw-r--r--src/rabbit_auth_backend_internal.erl12
-rw-r--r--src/rabbit_binding.erl2
-rw-r--r--src/rabbit_error_logger_file_h.erl38
-rw-r--r--src/rabbit_file.erl282
-rw-r--r--src/rabbit_guid.erl4
-rw-r--r--src/rabbit_misc.erl174
-rw-r--r--src/rabbit_mnesia.erl29
-rw-r--r--src/rabbit_msg_store.erl8
-rw-r--r--src/rabbit_networking.erl1
-rw-r--r--src/rabbit_prelaunch.erl4
-rw-r--r--src/rabbit_queue_index.erl34
-rw-r--r--src/rabbit_sasl_report_file_h.erl24
-rw-r--r--src/rabbit_tests.erl56
-rw-r--r--src/rabbit_trace.erl2
-rw-r--r--src/rabbit_upgrade.erl4
-rw-r--r--src/rabbit_version.erl4
-rw-r--r--src/rabbit_vhost.erl4
25 files changed, 631 insertions, 483 deletions
diff --git a/docs/html-to-website-xml.xsl b/docs/html-to-website-xml.xsl
index 4bfcf6ca50..88aa2e78f7 100644
--- a/docs/html-to-website-xml.xsl
+++ b/docs/html-to-website-xml.xsl
@@ -6,7 +6,7 @@
<xsl:param name="original"/>
-<xsl:output method="xml" doctype-public="bug in xslt processor requires fake doctype" doctype-system="otherwise css isn't included" />
+<xsl:output method="xml" />
<xsl:template match="*"/>
diff --git a/docs/rabbitmqctl.1.xml b/docs/rabbitmqctl.1.xml
index 3291c44d37..1f8cf28eb9 100644
--- a/docs/rabbitmqctl.1.xml
+++ b/docs/rabbitmqctl.1.xml
@@ -251,29 +251,28 @@
Instruct the RabbitMQ node to rotate the log files.
</para>
<para>
- The RabbitMQ broker will attempt to append the current contents
- of the log file to the file with name composed of the original
- name and the suffix.
- It will create a new file if such a file does not already exist.
- When no <option>suffix</option> is specified, the empty log file is
- simply created at the original location; no rotation takes place.
+ The RabbitMQ broker appends the contents of its log
+ files to files with names composed of the original name
+ and the suffix, and then resumes logging to freshly
+ created files at the original location. I.e. effectively
+ the current log contents are moved to the end of the
+ suffixed files.
</para>
<para>
- When an error occurs while appending the contents of the old log
- file, the operation behaves in the same way as if no <option>suffix</option> was
- specified.
- </para>
- <para>
- This command might be helpful when you are e.g. writing your
- own logrotate script and you do not want to restart the RabbitMQ
- node.
+ When the target files do not exist they are created.
+ target files do not already exist. When
+ no <option>suffix</option> is specified, the empty log
+ files are simply created at the original location; no
+ rotation takes place.
</para>
<para role="example-prefix">For example:</para>
<screen role="example">rabbitmqctl rotate_logs .1</screen>
<para role="example">
- This command instructs the RabbitMQ node to append the current content
- of the log files to the files with names consisting of the original logs'
- names and ".1" suffix, e.g. rabbit.log.1. Finally, the old log files are reopened.
+ This command instructs the RabbitMQ node to append the contents
+ of the log files to files with names consisting of the original logs'
+ names and ".1" suffix, e.g. rabbit@mymachine.log.1 and
+ rabbit@mymachine-sasl.log.1. Finally, logging resumes to
+ fresh files at the old locations.
</para>
</listitem>
</varlistentry>
diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server
index 7176d80130..deca5b3042 100755
--- a/scripts/rabbitmq-server
+++ b/scripts/rabbitmq-server
@@ -59,11 +59,6 @@ fi
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS=${SASL_LOGS}
[ "x" = "x$RABBITMQ_SASL_LOGS" ] && RABBITMQ_SASL_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}-sasl.log"
-[ "x" = "x$RABBITMQ_BACKUP_EXTENSION" ] && RABBITMQ_BACKUP_EXTENSION=${BACKUP_EXTENSION}
-[ "x" = "x$RABBITMQ_BACKUP_EXTENSION" ] && RABBITMQ_BACKUP_EXTENSION=".1"
-
-[ -f "${RABBITMQ_LOGS}" ] && cat "${RABBITMQ_LOGS}" >> "${RABBITMQ_LOGS}${RABBITMQ_BACKUP_EXTENSION}"
-[ -f "${RABBITMQ_SASL_LOGS}" ] && cat "${RABBITMQ_SASL_LOGS}" >> "${RABBITMQ_SASL_LOGS}${RABBITMQ_BACKUP_EXTENSION}"
RABBITMQ_START_RABBIT=
[ "x" = "x$RABBITMQ_ALLOW_INPUT" ] && RABBITMQ_START_RABBIT='-noinput'
@@ -111,8 +106,9 @@ exec erl \
${RABBITMQ_SERVER_ERL_ARGS} \
${RABBITMQ_LISTEN_ARG} \
-sasl errlog_type error \
- -kernel error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
- -sasl sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
+ -sasl sasl_error_logger false \
+ -rabbit error_logger '{file,"'${RABBITMQ_LOGS}'"}' \
+ -rabbit sasl_error_logger '{file,"'${RABBITMQ_SASL_LOGS}'"}' \
-os_mon start_cpu_sup true \
-os_mon start_disksup false \
-os_mon start_memsup false \
diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat
index 84d24c458b..56bed4358a 100644
--- a/scripts/rabbitmq-server.bat
+++ b/scripts/rabbitmq-server.bat
@@ -70,18 +70,9 @@ if "!RABBITMQ_LOG_BASE!"=="" (
rem We save the previous logs in their respective backup
rem Log management (rotation, filtering based of size...) is left as an exercice for the user.
-set BACKUP_EXTENSION=.1
-
set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-if exist "!LOGS!" (
- type "!LOGS!" >> "!LOGS!!BACKUP_EXTENSION!"
-)
-if exist "!SASL_LOGS!" (
- type "!SASL_LOGS!" >> "!SASL_LOGS!!BACKUP_EXTENSION!"
-)
-
rem End of log management
@@ -142,8 +133,9 @@ if not "!RABBITMQ_NODE_IP_ADDRESS!"=="" (
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
--sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat
index 60697d0b49..26c6ea6545 100644
--- a/scripts/rabbitmq-service.bat
+++ b/scripts/rabbitmq-service.bat
@@ -103,18 +103,9 @@ if "!RABBITMQ_LOG_BASE!"=="" (
rem We save the previous logs in their respective backup
rem Log management (rotation, filtering based on size...) is left as an exercise for the user.
-set BACKUP_EXTENSION=.1
-
set LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!.log
set SASL_LOGS=!RABBITMQ_LOG_BASE!\!RABBITMQ_NODENAME!-sasl.log
-if exist "!LOGS!" (
- type "!LOGS!" >> "!LOGS!!BACKUP_EXTENSION!"
-)
-if exist "!SASL_LOGS!" (
- type "!SASL_LOGS!" >> "!SASL_LOGS!!BACKUP_EXTENSION!"
-)
-
rem End of log management
@@ -208,8 +199,9 @@ set ERLANG_SERVICE_ARGUMENTS= ^
!RABBITMQ_LISTEN_ARG! ^
!RABBITMQ_SERVER_ERL_ARGS! ^
-sasl errlog_type error ^
--kernel error_logger {file,\""!LOGS:\=/!"\"} ^
--sasl sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
+-sasl sasl_error_logger false ^
+-rabbit error_logger {file,\""!LOGS:\=/!"\"} ^
+-rabbit sasl_error_logger {file,\""!SASL_LOGS:\=/!"\"} ^
-os_mon start_cpu_sup true ^
-os_mon start_disksup false ^
-os_mon start_memsup false ^
diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl
index e14dfe22d3..6c3f1b5f50 100644
--- a/src/file_handle_cache.erl
+++ b/src/file_handle_cache.erl
@@ -120,37 +120,39 @@
%% do not need to worry about their handles being closed by the server
%% - reopening them when necessary is handled transparently.
%%
-%% The server also supports obtain and transfer. obtain/0 blocks until
-%% a file descriptor is available, at which point the requesting
-%% process is considered to 'own' one more descriptor. transfer/1
-%% transfers ownership of a file descriptor between processes. It is
-%% non-blocking. Obtain is used to obtain permission to accept file
-%% descriptors. Obtain has a lower limit, set by the ?OBTAIN_LIMIT/1
-%% macro. File handles can use the entire limit, but will be evicted
-%% by obtain calls up to the point at which no more obtain calls can
-%% be satisfied by the obtains limit. Thus there will always be some
-%% capacity available for file handles. Processes that use obtain are
-%% never asked to return them, and they are not managed in any way by
-%% the server. It is simply a mechanism to ensure that processes that
-%% need file descriptors such as sockets can do so in such a way that
-%% the overall number of open file descriptors is managed.
+%% The server also supports obtain, release and transfer. obtain/0
+%% blocks until a file descriptor is available, at which point the
+%% requesting process is considered to 'own' one more
+%% descriptor. release/0 is the inverse operation and releases a
+%% previously obtained descriptor. transfer/1 transfers ownership of a
+%% file descriptor between processes. It is non-blocking. Obtain is
+%% used to obtain permission to accept file descriptors. Obtain has a
+%% lower limit, set by the ?OBTAIN_LIMIT/1 macro. File handles can use
+%% the entire limit, but will be evicted by obtain calls up to the
+%% point at which no more obtain calls can be satisfied by the obtains
+%% limit. Thus there will always be some capacity available for file
+%% handles. Processes that use obtain are never asked to return them,
+%% and they are not managed in any way by the server. It is simply a
+%% mechanism to ensure that processes that need file descriptors such
+%% as sockets can do so in such a way that the overall number of open
+%% file descriptors is managed.
%%
%% The callers of register_callback/3, obtain/0, and the argument of
%% transfer/1 are monitored, reducing the count of handles in use
%% appropriately when the processes terminate.
--behaviour(gen_server).
+-behaviour(gen_server2).
-export([register_callback/3]).
-export([open/3, close/1, read/2, append/2, sync/1, position/2, truncate/1,
current_virtual_offset/1, current_raw_offset/1, flush/1, copy/3,
set_maximum_since_use/1, delete/1, clear/1]).
--export([obtain/0, transfer/1, set_limit/1, get_limit/0, info_keys/0, info/0,
- info/1]).
+-export([obtain/0, release/0, transfer/1, set_limit/1, get_limit/0, info_keys/0,
+ info/0, info/1]).
-export([ulimit/0]).
-export([start_link/0, init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+ terminate/2, code_change/3, prioritise_cast/2]).
-define(SERVER, ?MODULE).
-define(RESERVED_FOR_OTHERS, 100).
@@ -248,6 +250,7 @@
-spec(clear/1 :: (ref()) -> ok_or_error()).
-spec(set_maximum_since_use/1 :: (non_neg_integer()) -> 'ok').
-spec(obtain/0 :: () -> 'ok').
+-spec(release/0 :: () -> 'ok').
-spec(transfer/1 :: (pid()) -> 'ok').
-spec(set_limit/1 :: (non_neg_integer()) -> 'ok').
-spec(get_limit/0 :: () -> non_neg_integer()).
@@ -266,11 +269,11 @@
%%----------------------------------------------------------------------------
start_link() ->
- gen_server:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
+ gen_server2:start_link({local, ?SERVER}, ?MODULE, [], [{timeout, infinity}]).
register_callback(M, F, A)
when is_atom(M) andalso is_atom(F) andalso is_list(A) ->
- gen_server:cast(?SERVER, {register_callback, self(), {M, F, A}}).
+ gen_server2:cast(?SERVER, {register_callback, self(), {M, F, A}}).
open(Path, Mode, Options) ->
Path1 = filename:absname(Path),
@@ -318,7 +321,7 @@ read(Ref, Count) ->
fun ([#handle { is_read = false }]) ->
{error, not_open_for_reading};
([Handle = #handle { hdl = Hdl, offset = Offset }]) ->
- case file:read(Hdl, Count) of
+ case prim_file:read(Hdl, Count) of
{ok, Data} = Obj -> Offset1 = Offset + iolist_size(Data),
{Obj,
[Handle #handle { offset = Offset1 }]};
@@ -338,7 +341,7 @@ append(Ref, Data) ->
write_buffer_size_limit = 0,
at_eof = true } = Handle1} ->
Offset1 = Offset + iolist_size(Data),
- {file:write(Hdl, Data),
+ {prim_file:write(Hdl, Data),
[Handle1 #handle { is_dirty = true, offset = Offset1 }]};
{{ok, _Offset}, #handle { write_buffer = WriteBuffer,
write_buffer_size = Size,
@@ -365,7 +368,7 @@ sync(Ref) ->
ok;
([Handle = #handle { hdl = Hdl,
is_dirty = true, write_buffer = [] }]) ->
- case file:sync(Hdl) of
+ case prim_file:sync(Hdl) of
ok -> {ok, [Handle #handle { is_dirty = false }]};
Error -> {Error, [Handle]}
end
@@ -382,7 +385,7 @@ truncate(Ref) ->
with_flushed_handles(
[Ref],
fun ([Handle1 = #handle { hdl = Hdl }]) ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end
@@ -409,7 +412,7 @@ copy(Src, Dest, Count) ->
fun ([SHandle = #handle { is_read = true, hdl = SHdl, offset = SOffset },
DHandle = #handle { is_write = true, hdl = DHdl, offset = DOffset }]
) ->
- case file:copy(SHdl, DHdl, Count) of
+ case prim_file:copy(SHdl, DHdl, Count) of
{ok, Count1} = Result1 ->
{Result1,
[SHandle #handle { offset = SOffset + Count1 },
@@ -429,7 +432,7 @@ delete(Ref) ->
Handle = #handle { path = Path } ->
case hard_close(Handle #handle { is_dirty = false,
write_buffer = [] }) of
- ok -> file:delete(Path);
+ ok -> prim_file:delete(Path);
{Error, Handle1} -> put_handle(Ref, Handle1),
Error
end
@@ -444,7 +447,7 @@ clear(Ref) ->
case maybe_seek(bof, Handle #handle { write_buffer = [],
write_buffer_size = 0 }) of
{{ok, 0}, Handle1 = #handle { hdl = Hdl }} ->
- case file:truncate(Hdl) of
+ case prim_file:truncate(Hdl) of
ok -> {ok, [Handle1 #handle { at_eof = true }]};
Error -> {Error, [Handle1]}
end;
@@ -471,21 +474,28 @@ set_maximum_since_use(MaximumAge) ->
end.
obtain() ->
- gen_server:call(?SERVER, {obtain, self()}, infinity).
+ %% If the FHC isn't running, obtains succeed immediately.
+ case whereis(?SERVER) of
+ undefined -> ok;
+ _ -> gen_server2:call(?SERVER, {obtain, self()}, infinity)
+ end.
+
+release() ->
+ gen_server2:cast(?SERVER, {release, self()}).
transfer(Pid) ->
- gen_server:cast(?SERVER, {transfer, self(), Pid}).
+ gen_server2:cast(?SERVER, {transfer, self(), Pid}).
set_limit(Limit) ->
- gen_server:call(?SERVER, {set_limit, Limit}, infinity).
+ gen_server2:call(?SERVER, {set_limit, Limit}, infinity).
get_limit() ->
- gen_server:call(?SERVER, get_limit, infinity).
+ gen_server2:call(?SERVER, get_limit, infinity).
info_keys() -> ?INFO_KEYS.
info() -> info(?INFO_KEYS).
-info(Items) -> gen_server:call(?SERVER, {info, Items}, infinity).
+info(Items) -> gen_server2:call(?SERVER, {info, Items}, infinity).
%%----------------------------------------------------------------------------
%% Internal functions
@@ -539,8 +549,8 @@ get_or_reopen(RefNewOrReopens) ->
{ok, [Handle || {_Ref, Handle} <- OpenHdls]};
{OpenHdls, ClosedHdls} ->
Oldest = oldest(get_age_tree(), fun () -> now() end),
- case gen_server:call(?SERVER, {open, self(), length(ClosedHdls),
- Oldest}, infinity) of
+ case gen_server2:call(?SERVER, {open, self(), length(ClosedHdls),
+ Oldest}, infinity) of
ok ->
case reopen(ClosedHdls) of
{ok, RefHdls} -> sort_handles(RefNewOrReopens,
@@ -567,10 +577,10 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
offset = Offset,
last_used_at = undefined }} |
RefNewOrReopenHdls] = ToOpen, Tree, RefHdls) ->
- case file:open(Path, case NewOrReopen of
- new -> Mode;
- reopen -> [read | Mode]
- end) of
+ case prim_file:open(Path, case NewOrReopen of
+ new -> Mode;
+ reopen -> [read | Mode]
+ end) of
{ok, Hdl} ->
Now = now(),
{{ok, _Offset}, Handle1} =
@@ -583,7 +593,7 @@ reopen([{Ref, NewOrReopen, Handle = #handle { hdl = closed,
Error ->
%% NB: none of the handles in ToOpen are in the age tree
Oldest = oldest(Tree, fun () -> undefined end),
- [gen_server:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
+ [gen_server2:cast(?SERVER, {close, self(), Oldest}) || _ <- ToOpen],
put_age_tree(Tree),
Error
end.
@@ -632,7 +642,7 @@ age_tree_delete(Then) ->
fun (Tree) ->
Tree1 = gb_trees:delete_any(Then, Tree),
Oldest = oldest(Tree1, fun () -> undefined end),
- gen_server:cast(?SERVER, {close, self(), Oldest}),
+ gen_server2:cast(?SERVER, {close, self(), Oldest}),
Tree1
end).
@@ -642,7 +652,7 @@ age_tree_change() ->
case gb_trees:is_empty(Tree) of
true -> Tree;
false -> {Oldest, _Ref} = gb_trees:smallest(Tree),
- gen_server:cast(?SERVER, {update, self(), Oldest})
+ gen_server2:cast(?SERVER, {update, self(), Oldest})
end,
Tree
end).
@@ -694,10 +704,10 @@ soft_close(Handle) ->
is_dirty = IsDirty,
last_used_at = Then } = Handle1 } ->
ok = case IsDirty of
- true -> file:sync(Hdl);
+ true -> prim_file:sync(Hdl);
false -> ok
end,
- ok = file:close(Hdl),
+ ok = prim_file:close(Hdl),
age_tree_delete(Then),
{ok, Handle1 #handle { hdl = closed,
is_dirty = false,
@@ -732,7 +742,7 @@ maybe_seek(NewOffset, Handle = #handle { hdl = Hdl, offset = Offset,
at_eof = AtEoF }) ->
{AtEoF1, NeedsSeek} = needs_seek(AtEoF, Offset, NewOffset),
case (case NeedsSeek of
- true -> file:position(Hdl, NewOffset);
+ true -> prim_file:position(Hdl, NewOffset);
false -> {ok, Offset}
end) of
{ok, Offset1} = Result ->
@@ -769,7 +779,7 @@ write_buffer(Handle = #handle { hdl = Hdl, offset = Offset,
write_buffer = WriteBuffer,
write_buffer_size = DataSize,
at_eof = true }) ->
- case file:write(Hdl, lists:reverse(WriteBuffer)) of
+ case prim_file:write(Hdl, lists:reverse(WriteBuffer)) of
ok ->
Offset1 = Offset + DataSize,
{ok, Handle #handle { offset = Offset1, is_dirty = true,
@@ -785,7 +795,7 @@ i(obtain_limit, #fhc_state{obtain_limit = Limit}) -> Limit;
i(Item, _) -> throw({bad_argument, Item}).
%%----------------------------------------------------------------------------
-%% gen_server callbacks
+%% gen_server2 callbacks
%%----------------------------------------------------------------------------
init([]) ->
@@ -814,6 +824,12 @@ init([]) ->
clients = Clients,
timer_ref = undefined }}.
+prioritise_cast(Msg, _State) ->
+ case Msg of
+ {release, _} -> 5;
+ _ -> 0
+ end.
+
handle_call({open, Pid, Requested, EldestUnusedSince}, From,
State = #fhc_state { open_count = Count,
open_pending = Pending,
@@ -893,6 +909,10 @@ handle_cast({update, Pid, EldestUnusedSince},
%% storm of messages
{noreply, State};
+handle_cast({release, Pid}, State) ->
+ {noreply, adjust_alarm(State, process_pending(
+ update_counts(obtain, Pid, -1, State)))};
+
handle_cast({close, Pid, EldestUnusedSince},
State = #fhc_state { elders = Elders, clients = Clients }) ->
true = case EldestUnusedSince of
@@ -1048,7 +1068,7 @@ run_pending_item(#pending { kind = Kind,
requested = Requested,
from = From },
State = #fhc_state { clients = Clients }) ->
- gen_server:reply(From, ok),
+ gen_server2:reply(From, ok),
true = ets:update_element(Clients, Pid, {#cstate.blocked, false}),
update_counts(Kind, Pid, Requested, State).
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 3e3117471f..47bc443303 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -232,12 +232,14 @@ start() ->
end.
stop() ->
+ rabbit_log:info("Stopping Rabbit~n"),
ok = rabbit_misc:stop_applications(application_load_order()).
stop_and_halt() ->
try
stop()
after
+ rabbit_misc:local_info_msg("Halting Erlang VM~n", []),
init:stop()
end,
ok.
@@ -264,6 +266,7 @@ environment() ->
rotate_logs(BinarySuffix) ->
Suffix = binary_to_list(BinarySuffix),
+ rabbit_misc:local_info_msg("Rotating logs with suffix '~s'~n", [Suffix]),
log_rotation_result(rotate_logs(log_location(kernel),
Suffix,
rabbit_error_logger_file_h),
@@ -461,20 +464,20 @@ insert_default_data() ->
ensure_working_log_handlers() ->
Handlers = gen_event:which_handlers(error_logger),
- ok = ensure_working_log_handler(error_logger_file_h,
+ ok = ensure_working_log_handler(error_logger_tty_h,
rabbit_error_logger_file_h,
error_logger_tty_h,
log_location(kernel),
Handlers),
- ok = ensure_working_log_handler(sasl_report_file_h,
+ ok = ensure_working_log_handler(sasl_report_tty_h,
rabbit_sasl_report_file_h,
sasl_report_tty_h,
log_location(sasl),
Handlers),
ok.
-ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
+ensure_working_log_handler(OldHandler, NewHandler, TTYHandler,
LogLocation, Handlers) ->
case LogLocation of
undefined -> ok;
@@ -484,10 +487,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
throw({error, {cannot_log_to_tty,
TTYHandler, not_installed}})
end;
- _ -> case lists:member(NewFHandler, Handlers) of
+ _ -> case lists:member(NewHandler, Handlers) of
true -> ok;
false -> case rotate_logs(LogLocation, "",
- OldFHandler, NewFHandler) of
+ OldHandler, NewHandler) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_log_to_file,
@@ -497,10 +500,10 @@ ensure_working_log_handler(OldFHandler, NewFHandler, TTYHandler,
end.
log_location(Type) ->
- case application:get_env(Type, case Type of
- kernel -> error_logger;
- sasl -> sasl_error_logger
- end) of
+ case application:get_env(rabbit, case Type of
+ kernel -> error_logger;
+ sasl -> sasl_error_logger
+ end) of
{ok, {file, File}} -> File;
{ok, false} -> undefined;
{ok, tty} -> tty;
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index c28cd5bf71..e3a2ca9010 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -42,7 +42,6 @@
backing_queue,
backing_queue_state,
active_consumers,
- blocked_consumers,
expires,
sync_timer_ref,
rate_timer_ref,
@@ -60,6 +59,7 @@
monitor_ref,
acktags,
consumer_count,
+ blocked_consumers,
limiter,
is_limit_active,
unsent_message_count}).
@@ -124,7 +124,6 @@ init(Q) ->
backing_queue = backing_queue_module(Q),
backing_queue_state = undefined,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = undefined,
@@ -150,7 +149,6 @@ init_with_backing_queue_state(Q = #amqqueue{exclusive_owner = Owner}, BQ, BQS,
backing_queue = BQ,
backing_queue_state = BQS,
active_consumers = queue:new(),
- blocked_consumers = queue:new(),
expires = undefined,
sync_timer_ref = undefined,
rate_timer_ref = RateTRef,
@@ -340,6 +338,7 @@ ch_record(ChPid) ->
monitor_ref = MonitorRef,
acktags = sets:new(),
consumer_count = 0,
+ blocked_consumers = queue:new(),
is_limit_active = false,
limiter = rabbit_limiter:make_token(),
unsent_message_count = 0},
@@ -381,6 +380,9 @@ update_consumer_count(C = #cr{consumer_count = Count}, Delta) ->
all_ch_record() -> [C || {{ch, _}, C} <- get()].
+block_consumer(C = #cr{blocked_consumers = Blocked}, QEntry) ->
+ update_ch_record(C#cr{blocked_consumers = queue:in(QEntry, Blocked)}).
+
is_ch_blocked(#cr{unsent_message_count = Count, is_limit_active = Limited}) ->
Limited orelse Count >= ?UNSENT_MESSAGE_LIMIT.
@@ -392,67 +394,56 @@ ch_record_state_transition(OldCR, NewCR) ->
end.
deliver_msgs_to_consumers(Funs = {PredFun, DeliverFun}, FunAcc,
- State = #q{q = #amqqueue{name = QName},
- active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
- case queue:out(ActiveConsumers) of
- {{value, QEntry = {ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}},
- ActiveConsumersTail} ->
- C = #cr{limiter = Limiter,
- unsent_message_count = Count,
- acktags = ChAckTags} = ch_record(ChPid),
- IsMsgReady = PredFun(FunAcc, State),
- case (IsMsgReady andalso
- rabbit_limiter:can_send(Limiter, self(), AckRequired)) of
- true ->
- {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
- DeliverFun(AckRequired, FunAcc, State),
- rabbit_channel:deliver(
- ChPid, ConsumerTag, AckRequired,
- {QName, self(), AckTag, IsDelivered, Message}),
- ChAckTags1 =
- case AckRequired of
- true -> sets:add_element(AckTag, ChAckTags);
- false -> ChAckTags
- end,
- NewC = update_ch_record(
- C#cr{unsent_message_count = Count + 1,
- acktags = ChAckTags1}),
- {NewActiveConsumers, NewBlockedConsumers} =
- case ch_record_state_transition(C, NewC) of
- ok -> {queue:in(QEntry, ActiveConsumersTail),
- BlockedConsumers};
- block -> {ActiveConsumers1, BlockedConsumers1} =
- move_consumers(ChPid,
- ActiveConsumersTail,
- BlockedConsumers),
- {ActiveConsumers1,
- queue:in(QEntry, BlockedConsumers1)}
- end,
- State2 = State1#q{
- active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers},
- deliver_msgs_to_consumers(Funs, FunAcc1, State2);
- %% if IsMsgReady then we've hit the limiter
- false when IsMsgReady ->
- update_ch_record(C#cr{is_limit_active = true}),
- {NewActiveConsumers, NewBlockedConsumers} =
- move_consumers(ChPid,
- ActiveConsumers,
- BlockedConsumers),
- deliver_msgs_to_consumers(
- Funs, FunAcc,
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers});
- false ->
- %% no message was ready, so we don't need to block anyone
- {FunAcc, State}
- end;
- {empty, _} ->
- {FunAcc, State}
+ State = #q{active_consumers = ActiveConsumers}) ->
+ case PredFun(FunAcc, State) of
+ false -> {FunAcc, State};
+ true -> case queue:out(ActiveConsumers) of
+ {empty, _} ->
+ {FunAcc, State};
+ {{value, QEntry}, Tail} ->
+ {FunAcc1, State1} =
+ deliver_msg_to_consumer(
+ DeliverFun, QEntry,
+ FunAcc, State#q{active_consumers = Tail}),
+ deliver_msgs_to_consumers(Funs, FunAcc1, State1)
+ end
+ end.
+
+deliver_msg_to_consumer(DeliverFun, E = {ChPid, Consumer}, FunAcc, State) ->
+ C = ch_record(ChPid),
+ case is_ch_blocked(C) of
+ true -> block_consumer(C, E),
+ {FunAcc, State};
+ false -> case rabbit_limiter:can_send(C#cr.limiter, self(),
+ Consumer#consumer.ack_required) of
+ false -> block_consumer(C#cr{is_limit_active = true}, E),
+ {FunAcc, State};
+ true -> AC1 = queue:in(E, State#q.active_consumers),
+ deliver_msg_to_consumer(
+ DeliverFun, Consumer, C, FunAcc,
+ State#q{active_consumers = AC1})
+ end
end.
+deliver_msg_to_consumer(DeliverFun,
+ #consumer{tag = ConsumerTag,
+ ack_required = AckRequired},
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ unsent_message_count = Count},
+ FunAcc, State = #q{q = #amqqueue{name = QName}}) ->
+ {{Message, IsDelivered, AckTag}, FunAcc1, State1} =
+ DeliverFun(AckRequired, FunAcc, State),
+ rabbit_channel:deliver(ChPid, ConsumerTag, AckRequired,
+ {QName, self(), AckTag, IsDelivered, Message}),
+ ChAckTags1 = case AckRequired of
+ true -> sets:add_element(AckTag, ChAckTags);
+ false -> ChAckTags
+ end,
+ update_ch_record(C#cr{acktags = ChAckTags1,
+ unsent_message_count = Count + 1}),
+ {FunAcc1, State1}.
+
deliver_from_queue_pred(IsEmpty, _State) -> not IsEmpty.
deliver_from_queue_deliver(AckRequired, false, State) ->
@@ -589,43 +580,34 @@ fetch(AckRequired, State = #q{backing_queue_state = BQS,
{Result, BQS1} = BQ:fetch(AckRequired, BQS),
{Result, State#q{backing_queue_state = BQS1}}.
-add_consumer(ChPid, Consumer, Queue) -> queue:in({ChPid, Consumer}, Queue).
-
remove_consumer(ChPid, ConsumerTag, Queue) ->
queue:filter(fun ({CP, #consumer{tag = CTag}}) ->
(CP /= ChPid) or (CTag /= ConsumerTag)
end, Queue).
remove_consumers(ChPid, Queue) ->
- {Kept, Removed} = split_by_channel(ChPid, Queue),
- [emit_consumer_deleted(Ch, CTag) ||
- {Ch, #consumer{tag = CTag}} <- queue:to_list(Removed)],
- Kept.
-
-move_consumers(ChPid, From, To) ->
- {Kept, Removed} = split_by_channel(ChPid, From),
- {Kept, queue:join(To, Removed)}.
-
-split_by_channel(ChPid, Queue) ->
- {Kept, Removed} = lists:partition(fun ({CP, _}) -> CP /= ChPid end,
- queue:to_list(Queue)),
- {queue:from_list(Kept), queue:from_list(Removed)}.
+ queue:filter(fun ({CP, #consumer{tag = CTag}}) when CP =:= ChPid ->
+ emit_consumer_deleted(ChPid, CTag),
+ false;
+ (_) ->
+ true
+ end, Queue).
possibly_unblock(State, ChPid, Update) ->
case lookup_ch(ChPid) of
not_found ->
State;
C ->
- NewC = update_ch_record(Update(C)),
- case ch_record_state_transition(C, NewC) of
- ok -> State;
- unblock -> {NewBlockedConsumers, NewActiveConsumers} =
- move_consumers(ChPid,
- State#q.blocked_consumers,
- State#q.active_consumers),
- run_message_queue(
- State#q{active_consumers = NewActiveConsumers,
- blocked_consumers = NewBlockedConsumers})
+ C1 = Update(C),
+ case ch_record_state_transition(C, C1) of
+ ok -> update_ch_record(C1),
+ State;
+ unblock -> #cr{blocked_consumers = Consumers} = C1,
+ update_ch_record(
+ C1#cr{blocked_consumers = queue:new()}),
+ AC1 = queue:join(State#q.active_consumers,
+ Consumers),
+ run_message_queue(State#q{active_consumers = AC1})
end
end.
@@ -637,7 +619,10 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
case lookup_ch(DownPid) of
not_found ->
{ok, State};
- C = #cr{ch_pid = ChPid, acktags = ChAckTags} ->
+ C = #cr{ch_pid = ChPid,
+ acktags = ChAckTags,
+ blocked_consumers = Blocked} ->
+ _ = remove_consumers(ChPid, Blocked), %% for stats emission
ok = erase_ch_record(C),
State1 = State#q{
exclusive_consumer = case Holder of
@@ -645,9 +630,7 @@ handle_ch_down(DownPid, State = #q{exclusive_consumer = Holder}) ->
Other -> Other
end,
active_consumers = remove_consumers(
- ChPid, State#q.active_consumers),
- blocked_consumers = remove_consumers(
- ChPid, State#q.blocked_consumers)},
+ ChPid, State#q.active_consumers)},
case should_auto_delete(State1) of
true -> {stop, State1};
false -> {ok, requeue_and_run(sets:to_list(ChAckTags),
@@ -665,8 +648,15 @@ check_exclusive_access(none, true, State) ->
false -> in_use
end.
-is_unused(State) -> queue:is_empty(State#q.active_consumers) andalso
- queue:is_empty(State#q.blocked_consumers).
+consumer_count() -> consumer_count(fun (_) -> false end).
+
+active_consumer_count() -> consumer_count(fun is_ch_blocked/1).
+
+consumer_count(Exclude) ->
+ lists:sum([Count || C = #cr{consumer_count = Count} <- all_ch_record(),
+ not Exclude(C)]).
+
+is_unused(_State) -> consumer_count() == 0.
maybe_send_reply(_ChPid, undefined) -> ok;
maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
@@ -792,8 +782,8 @@ i(messages_unacknowledged, _) ->
i(messages, State) ->
lists:sum([i(Item, State) || Item <- [messages_ready,
messages_unacknowledged]]);
-i(consumers, State) ->
- queue:len(State#q.active_consumers) + queue:len(State#q.blocked_consumers);
+i(consumers, _) ->
+ consumer_count();
i(memory, _) ->
{memory, M} = process_info(self(), memory),
M;
@@ -809,13 +799,15 @@ i(backing_queue_status, #q{backing_queue_state = BQS, backing_queue = BQ}) ->
i(Item, _) ->
throw({bad_argument, Item}).
-consumers(#q{active_consumers = ActiveConsumers,
- blocked_consumers = BlockedConsumers}) ->
+consumers(#q{active_consumers = ActiveConsumers}) ->
+ lists:foldl(fun (C, Acc) -> consumers(C#cr.blocked_consumers, Acc) end,
+ consumers(ActiveConsumers, []), all_ch_record()).
+
+consumers(Consumers, Acc) ->
rabbit_misc:queue_fold(
- fun ({ChPid, #consumer{tag = ConsumerTag,
- ack_required = AckRequired}}, Acc) ->
- [{ChPid, ConsumerTag, AckRequired} | Acc]
- end, [], queue:join(ActiveConsumers, BlockedConsumers)).
+ fun ({ChPid, #consumer{tag = CTag, ack_required = AckRequired}}, Acc1) ->
+ [{ChPid, CTag, AckRequired} | Acc1]
+ end, Acc, Consumers).
emit_stats(State) ->
emit_stats(State, []).
@@ -982,17 +974,14 @@ handle_call({basic_consume, NoAck, ChPid, Limiter,
State1 = State#q{has_had_consumers = true,
exclusive_consumer = ExclusiveConsumer},
ok = maybe_send_reply(ChPid, OkMsg),
+ E = {ChPid, Consumer},
State2 =
case is_ch_blocked(C1) of
- true -> State1#q{
- blocked_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.blocked_consumers)};
- false -> run_message_queue(
- State1#q{
- active_consumers =
- add_consumer(ChPid, Consumer,
- State1#q.active_consumers)})
+ true -> block_consumer(C1, E),
+ State1;
+ false -> update_ch_record(C1),
+ AC1 = queue:in(E, State1#q.active_consumers),
+ run_message_queue(State1#q{active_consumers = AC1})
end,
emit_consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
not NoAck),
@@ -1005,9 +994,10 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
case lookup_ch(ChPid) of
not_found ->
reply(ok, State);
- C ->
- update_consumer_count(C, -1),
+ C = #cr{blocked_consumers = Blocked} ->
emit_consumer_deleted(ChPid, ConsumerTag),
+ Blocked1 = remove_consumer(ChPid, ConsumerTag, Blocked),
+ update_consumer_count(C#cr{blocked_consumers = Blocked1}, -1),
State1 = State#q{
exclusive_consumer = case Holder of
{ChPid, ConsumerTag} -> none;
@@ -1015,10 +1005,7 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end,
active_consumers = remove_consumer(
ChPid, ConsumerTag,
- State#q.active_consumers),
- blocked_consumers = remove_consumer(
- ChPid, ConsumerTag,
- State#q.blocked_consumers)},
+ State#q.active_consumers)},
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> {stop, normal, ok, State1}
@@ -1026,10 +1013,9 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
end;
handle_call(stat, _From, State) ->
- State1 = #q{backing_queue = BQ, backing_queue_state = BQS,
- active_consumers = ActiveConsumers} =
+ State1 = #q{backing_queue = BQ, backing_queue_state = BQS} =
drop_expired_messages(ensure_expiry_timer(State)),
- reply({ok, BQ:len(BQS), queue:len(ActiveConsumers)}, State1);
+ reply({ok, BQ:len(BQS), active_consumer_count()}, State1);
handle_call({delete, IfUnused, IfEmpty}, _From,
State = #q{backing_queue_state = BQS, backing_queue = BQ}) ->
diff --git a/src/rabbit_auth_backend_internal.erl b/src/rabbit_auth_backend_internal.erl
index 156d98dc37..086a90b49f 100644
--- a/src/rabbit_auth_backend_internal.erl
+++ b/src/rabbit_auth_backend_internal.erl
@@ -146,6 +146,7 @@ permission_index(read) -> #permission.read.
%% Manipulation of the user database
add_user(Username, Password) ->
+ rabbit_log:info("Creating user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_user, Username}) of
@@ -161,10 +162,10 @@ add_user(Username, Password) ->
mnesia:abort({user_already_exists, Username})
end
end),
- rabbit_log:info("Created user ~p~n", [Username]),
R.
delete_user(Username) ->
+ rabbit_log:info("Deleting user '~s'~n", [Username]),
R = rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user(
Username,
@@ -181,13 +182,14 @@ delete_user(Username) ->
write)],
ok
end)),
- rabbit_log:info("Deleted user ~p~n", [Username]),
R.
change_password(Username, Password) ->
+ rabbit_log:info("Changing password for '~s'~n", [Username]),
change_password_hash(Username, hash_password(Password)).
clear_password(Username) ->
+ rabbit_log:info("Clearing password for '~s'~n", [Username]),
change_password_hash(Username, <<"">>).
change_password_hash(Username, PasswordHash) ->
@@ -195,7 +197,6 @@ change_password_hash(Username, PasswordHash) ->
User#internal_user{
password_hash = PasswordHash }
end),
- rabbit_log:info("Changed password for user ~p~n", [Username]),
R.
hash_password(Cleartext) ->
@@ -217,11 +218,10 @@ salted_md5(Salt, Cleartext) ->
erlang:md5(Salted).
set_tags(Username, Tags) ->
+ rabbit_log:info("Setting user tags for user '~s' to ~p~n", [Username, Tags]),
R = update_user(Username, fun(User) ->
User#internal_user{tags = Tags}
end),
- rabbit_log:info("Set user tags for user ~p to ~p~n",
- [Username, Tags]),
R.
update_user(Username, Fun) ->
@@ -251,6 +251,8 @@ validate_regexp(RegexpBin) ->
end.
set_permissions(Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm) ->
+ rabbit_log:info("Setting permissions for '~s' in '~s' to '~s', '~s', '~s'~n",
+ [Username, VHostPath, ConfigurePerm, WritePerm, ReadPerm]),
lists:map(fun validate_regexp/1, [ConfigurePerm, WritePerm, ReadPerm]),
rabbit_misc:execute_mnesia_transaction(
rabbit_misc:with_user_and_vhost(
diff --git a/src/rabbit_binding.erl b/src/rabbit_binding.erl
index 43c26941da..e625a427e4 100644
--- a/src/rabbit_binding.erl
+++ b/src/rabbit_binding.erl
@@ -40,7 +40,7 @@
'source_and_destination_not_found')).
-type(bind_ok_or_error() :: 'ok' | bind_errors() |
rabbit_types:error('binding_not_found')).
--type(bind_res() :: bind_ok_or_error() | rabbit_misc:const(bind_ok_or_error())).
+-type(bind_res() :: bind_ok_or_error() | rabbit_misc:thunk(bind_ok_or_error())).
-type(inner_fun() ::
fun((rabbit_types:exchange(),
rabbit_types:exchange() | rabbit_types:amqqueue()) ->
diff --git a/src/rabbit_error_logger_file_h.erl b/src/rabbit_error_logger_file_h.erl
index 7e9ebc4fa2..7b6e07c19f 100644
--- a/src/rabbit_error_logger_file_h.erl
+++ b/src/rabbit_error_logger_file_h.erl
@@ -26,11 +26,16 @@
%% with the result of closing the old handler when swapping handlers.
%% The first init/1 additionally allows for simple log rotation
%% when the suffix is not the empty string.
+%% The original init/2 also opened the file in 'write' mode, thus
+%% overwriting old logs. To remedy this, init/2 from
+%% lib/stdlib/src/error_logger_file_h.erl from R14B3 was copied as
+%% init_file/2 and changed so that it opens the file in 'append' mode.
%% Used only when swapping handlers in log rotation
init({{File, Suffix}, []}) ->
- case rabbit_misc:append_file(File, Suffix) of
- ok -> ok;
+ case rabbit_file:append_file(File, Suffix) of
+ ok -> file:delete(File),
+ ok;
{error, Error} ->
rabbit_log:error("Failed to append contents of "
"log file '~s' to '~s':~n~p~n",
@@ -45,12 +50,31 @@ init({{File, _}, error}) ->
%% log rotation
init({File, []}) ->
init(File);
-init({File, _Type} = FileInfo) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- error_logger_file_h:init(FileInfo);
+%% Used only when taking over from the tty handler
+init({{File, []}, _}) ->
+ init(File);
+init({File, {error_logger, Buf}}) ->
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(File, {error_logger, Buf});
init(File) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- error_logger_file_h:init(File).
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(File, []).
+
+init_file(File, {error_logger, Buf}) ->
+ case init_file(File, error_logger) of
+ {ok, {Fd, File, PrevHandler}} ->
+ [handle_event(Event, {Fd, File, PrevHandler}) ||
+ {_, Event} <- lists:reverse(Buf)],
+ {ok, {Fd, File, PrevHandler}};
+ Error ->
+ Error
+ end;
+init_file(File, PrevHandler) ->
+ process_flag(trap_exit, true),
+ case file:open(File, [append]) of
+ {ok,Fd} -> {ok, {Fd, File, PrevHandler}};
+ Error -> Error
+ end.
handle_event(Event, State) ->
error_logger_file_h:handle_event(Event, State).
diff --git a/src/rabbit_file.erl b/src/rabbit_file.erl
new file mode 100644
index 0000000000..5cb8e7b69d
--- /dev/null
+++ b/src/rabbit_file.erl
@@ -0,0 +1,282 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is VMware, Inc.
+%% Copyright (c) 2011 VMware, Inc. All rights reserved.
+%%
+
+-module(rabbit_file).
+
+-include_lib("kernel/include/file.hrl").
+
+-export([is_file/1, is_dir/1, file_size/1, ensure_dir/1, wildcard/2, list_dir/1]).
+-export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
+-export([append_file/2, ensure_parent_dirs_exist/1]).
+-export([rename/2, delete/1, recursive_delete/1, recursive_copy/2]).
+-export([lock_file/1]).
+
+%%----------------------------------------------------------------------------
+
+-ifdef(use_specs).
+
+-type(ok_or_error() :: rabbit_types:ok_or_error(any())).
+
+-spec(is_file/1 :: ((file:filename())) -> boolean()).
+-spec(is_dir/1 :: ((file:filename())) -> boolean()).
+-spec(file_size/1 :: ((file:filename())) -> non_neg_integer()).
+-spec(ensure_dir/1 :: ((file:filename())) -> ok_or_error()).
+-spec(wildcard/2 :: (string(), file:filename()) -> [file:filename()]).
+-spec(list_dir/1 :: (file:filename()) -> rabbit_types:ok_or_error2(
+ [file:filename()], any())).
+-spec(read_term_file/1 ::
+ (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
+-spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
+-spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
+-spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
+-spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
+-spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
+-spec(rename/2 ::
+ (file:filename(), file:filename()) -> ok_or_error()).
+-spec(delete/1 :: ([file:filename()]) -> ok_or_error()).
+-spec(recursive_delete/1 ::
+ ([file:filename()])
+ -> rabbit_types:ok_or_error({file:filename(), any()})).
+-spec(recursive_copy/2 ::
+ (file:filename(), file:filename())
+ -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
+-spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
+
+-endif.
+
+%%----------------------------------------------------------------------------
+
+is_file(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{type=regular}} -> true;
+ {ok, #file_info{type=directory}} -> true;
+ _ -> false
+ end.
+
+is_dir(Dir) -> is_dir_internal(read_file_info(Dir)).
+
+is_dir_no_handle(Dir) -> is_dir_internal(prim_file:read_file_info(Dir)).
+
+is_dir_internal({ok, #file_info{type=directory}}) -> true;
+is_dir_internal(_) -> false.
+
+file_size(File) ->
+ case read_file_info(File) of
+ {ok, #file_info{size=Size}} -> Size;
+ _ -> 0
+ end.
+
+ensure_dir(File) -> with_fhc_handle(fun () -> ensure_dir_internal(File) end).
+
+ensure_dir_internal("/") ->
+ ok;
+ensure_dir_internal(File) ->
+ Dir = filename:dirname(File),
+ case is_dir_no_handle(Dir) of
+ true -> ok;
+ false -> ensure_dir_internal(Dir),
+ prim_file:make_dir(Dir)
+ end.
+
+wildcard(Pattern, Dir) ->
+ {ok, Files} = list_dir(Dir),
+ {ok, RE} = re:compile(Pattern, [anchored]),
+ [File || File <- Files, match =:= re:run(File, RE, [{capture, none}])].
+
+list_dir(Dir) -> with_fhc_handle(fun () -> prim_file:list_dir(Dir) end).
+
+read_file_info(File) ->
+ with_fhc_handle(fun () -> prim_file:read_file_info(File) end).
+
+with_fhc_handle(Fun) ->
+ ok = file_handle_cache:obtain(),
+ try Fun()
+ after ok = file_handle_cache:release()
+ end.
+
+read_term_file(File) ->
+ try
+ {ok, Data} = with_fhc_handle(fun () -> prim_file:read_file(File) end),
+ {ok, Tokens, _} = erl_scan:string(binary_to_list(Data)),
+ TokenGroups = group_tokens(Tokens),
+ {ok, [begin
+ {ok, Term} = erl_parse:parse_term(Tokens1),
+ Term
+ end || Tokens1 <- TokenGroups]}
+ catch
+ error:{badmatch, Error} -> Error
+ end.
+
+group_tokens(Ts) -> [lists:reverse(G) || G <- group_tokens([], Ts)].
+
+group_tokens([], []) -> [];
+group_tokens(Cur, []) -> [Cur];
+group_tokens(Cur, [T = {dot, _} | Ts]) -> [[T | Cur] | group_tokens([], Ts)];
+group_tokens(Cur, [T | Ts]) -> group_tokens([T | Cur], Ts).
+
+write_term_file(File, Terms) ->
+ write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
+ Term <- Terms])).
+
+write_file(Path, Data) -> write_file(Path, Data, []).
+
+%% write_file/3 and make_binary/1 are both based on corresponding
+%% functions in the kernel/file.erl module of the Erlang R14B02
+%% release, which is licensed under the EPL. That implementation of
+%% write_file/3 does not do an fsync prior to closing the file, hence
+%% the existence of this version. APIs are otherwise identical.
+write_file(Path, Data, Modes) ->
+ Modes1 = [binary, write | (Modes -- [binary, write])],
+ case make_binary(Data) of
+ Bin when is_binary(Bin) ->
+ with_fhc_handle(
+ fun () -> case prim_file:open(Path, Modes1) of
+ {ok, Hdl} -> try prim_file:write(Hdl, Bin) of
+ ok -> prim_file:sync(Hdl);
+ {error, _} = E -> E
+ after
+ prim_file:close(Hdl)
+ end;
+ {error, _} = E -> E
+ end
+ end);
+ {error, _} = E -> E
+ end.
+
+make_binary(Bin) when is_binary(Bin) ->
+ Bin;
+make_binary(List) ->
+ try
+ iolist_to_binary(List)
+ catch error:Reason ->
+ {error, Reason}
+ end.
+
+
+append_file(File, Suffix) ->
+ case read_file_info(File) of
+ {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
+ {error, enoent} -> append_file(File, 0, Suffix);
+ Error -> Error
+ end.
+
+append_file(_, _, "") ->
+ ok;
+append_file(File, 0, Suffix) ->
+ with_fhc_handle(fun () ->
+ case prim_file:open([File, Suffix], [append]) of
+ {ok, Fd} -> prim_file:close(Fd);
+ Error -> Error
+ end
+ end);
+append_file(File, _, Suffix) ->
+ case with_fhc_handle(fun () -> prim_file:read_file(File) end) of
+ {ok, Data} -> write_file([File, Suffix], Data, [append]);
+ Error -> Error
+ end.
+
+ensure_parent_dirs_exist(Filename) ->
+ case ensure_dir(Filename) of
+ ok -> ok;
+ {error, Reason} ->
+ throw({error, {cannot_create_parent_dirs, Filename, Reason}})
+ end.
+
+rename(Old, New) -> with_fhc_handle(fun () -> prim_file:rename(Old, New) end).
+
+delete(File) -> with_fhc_handle(fun () -> prim_file:delete(File) end).
+
+recursive_delete(Files) ->
+ with_fhc_handle(
+ fun () -> lists:foldl(fun (Path, ok) -> recursive_delete1(Path);
+ (_Path, {error, _Err} = Error) -> Error
+ end, ok, Files)
+ end).
+
+recursive_delete1(Path) ->
+ case is_dir_no_handle(Path) and not(is_symlink_no_handle(Path)) of
+ false -> case prim_file:delete(Path) of
+ ok -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ true -> case prim_file:list_dir(Path) of
+ {ok, FileNames} ->
+ case lists:foldl(
+ fun (FileName, ok) ->
+ recursive_delete1(
+ filename:join(Path, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames) of
+ ok ->
+ case prim_file:del_dir(Path) of
+ ok -> ok;
+ {error, Err} -> {error, {Path, Err}}
+ end;
+ {error, _Err} = Error ->
+ Error
+ end;
+ {error, Err} ->
+ {error, {Path, Err}}
+ end
+ end.
+
+is_symlink_no_handle(File) ->
+ case prim_file:read_link(File) of
+ {ok, _} -> true;
+ _ -> false
+ end.
+
+recursive_copy(Src, Dest) ->
+ %% Note that this uses the 'file' module and, hence, shouldn't be
+ %% run on many processes at once.
+ case is_dir(Src) of
+ false -> case file:copy(Src, Dest) of
+ {ok, _Bytes} -> ok;
+ {error, enoent} -> ok; %% Path doesn't exist anyway
+ {error, Err} -> {error, {Src, Dest, Err}}
+ end;
+ true -> case file:list_dir(Src) of
+ {ok, FileNames} ->
+ case file:make_dir(Dest) of
+ ok ->
+ lists:foldl(
+ fun (FileName, ok) ->
+ recursive_copy(
+ filename:join(Src, FileName),
+ filename:join(Dest, FileName));
+ (_FileName, Error) ->
+ Error
+ end, ok, FileNames);
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end;
+ {error, Err} ->
+ {error, {Src, Dest, Err}}
+ end
+ end.
+
+%% TODO: When we stop supporting Erlang prior to R14, this should be
+%% replaced with file:open [write, exclusive]
+lock_file(Path) ->
+ case is_file(Path) of
+ true -> {error, eexist};
+ false -> with_fhc_handle(
+ fun () -> {ok, Lock} = prim_file:open(Path, [write]),
+ ok = prim_file:close(Lock)
+ end)
+ end.
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 234bc55be5..cf3fea1a53 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -52,13 +52,13 @@ start_link() ->
update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
- Serial = case rabbit_misc:read_term_file(Filename) of
+ Serial = case rabbit_file:read_term_file(Filename) of
{ok, [Num]} -> Num;
{error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
- case rabbit_misc:write_term_file(Filename, [Serial + 1]) of
+ case rabbit_file:write_term_file(Filename, [Serial + 1]) of
ok -> ok;
{error, Reason1} ->
throw({error, {cannot_write_serial_file, Filename, Reason1}})
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 0b39a20927..f2dc97fdcb 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -18,8 +18,6 @@
-include("rabbit.hrl").
-include("rabbit_framing.hrl").
--include_lib("kernel/include/file.hrl").
-
-export([method_record_type/1, polite_pause/0, polite_pause/1]).
-export([die/1, frame_error/2, amqp_error/4,
protocol_error/3, protocol_error/4, protocol_error/1]).
@@ -40,19 +38,16 @@
-export([upmap/2, map_in_order/2]).
-export([table_filter/3]).
-export([dirty_read_all/1, dirty_foreach_key/2, dirty_dump_log/1]).
--export([read_term_file/1, write_term_file/2, write_file/2, write_file/3]).
--export([append_file/2, ensure_parent_dirs_exist/1]).
--export([format_stderr/2, with_local_io/1]).
+-export([format_stderr/2, with_local_io/1, local_info_msg/2]).
-export([start_applications/1, stop_applications/1]).
-export([unfold/2, ceil/1, queue_fold/3]).
-export([sort_field_table/1]).
-export([pid_to_string/1, string_to_pid/1]).
-export([version_compare/2, version_compare/3]).
--export([recursive_delete/1, recursive_copy/2, dict_cons/3, orddict_cons/3]).
+-export([dict_cons/3, orddict_cons/3]).
-export([get_options/2]).
-export([all_module_attributes/1, build_acyclic_graph/3]).
-export([now_ms/0]).
--export([lock_file/1]).
-export([const_ok/0, const/1]).
-export([ntoa/1, ntoab/1]).
-export([is_process_alive/1]).
@@ -158,15 +153,9 @@
-spec(dirty_foreach_key/2 :: (fun ((any()) -> any()), atom())
-> 'ok' | 'aborted').
-spec(dirty_dump_log/1 :: (file:filename()) -> ok_or_error()).
--spec(read_term_file/1 ::
- (file:filename()) -> {'ok', [any()]} | rabbit_types:error(any())).
--spec(write_term_file/2 :: (file:filename(), [any()]) -> ok_or_error()).
--spec(write_file/2 :: (file:filename(), iodata()) -> ok_or_error()).
--spec(write_file/3 :: (file:filename(), iodata(), [any()]) -> ok_or_error()).
--spec(append_file/2 :: (file:filename(), string()) -> ok_or_error()).
--spec(ensure_parent_dirs_exist/1 :: (string()) -> 'ok').
-spec(format_stderr/2 :: (string(), [any()]) -> 'ok').
-spec(with_local_io/1 :: (fun (() -> A)) -> A).
+-spec(local_info_msg/2 :: (string(), [any()]) -> 'ok').
-spec(start_applications/1 :: ([atom()]) -> 'ok').
-spec(stop_applications/1 :: ([atom()]) -> 'ok').
-spec(unfold/2 :: (fun ((A) -> ({'true', B, A} | 'false')), A) -> {[B], A}).
@@ -180,12 +169,6 @@
-spec(version_compare/3 ::
(string(), string(), ('lt' | 'lte' | 'eq' | 'gte' | 'gt'))
-> boolean()).
--spec(recursive_delete/1 ::
- ([file:filename()])
- -> rabbit_types:ok_or_error({file:filename(), any()})).
--spec(recursive_copy/2 ::
- (file:filename(), file:filename())
- -> rabbit_types:ok_or_error({file:filename(), file:filename(), any()})).
-spec(dict_cons/3 :: (any(), any(), dict()) -> dict()).
-spec(orddict_cons/3 :: (any(), any(), orddict:orddict()) -> orddict:orddict()).
-spec(get_options/2 :: ([optdef()], [string()])
@@ -199,7 +182,6 @@
{bad_edge, [digraph:vertex()]}),
digraph:vertex(), digraph:vertex()})).
-spec(now_ms/0 :: () -> non_neg_integer()).
--spec(lock_file/1 :: (file:filename()) -> rabbit_types:ok_or_error('eexist')).
-spec(const_ok/0 :: () -> 'ok').
-spec(const/1 :: (A) -> thunk(A)).
-spec(ntoa/1 :: (inet:ip_address()) -> string()).
@@ -525,74 +507,6 @@ dirty_dump_log1(LH, {K, Terms, BadBytes}) ->
io:format("Bad Chunk, ~p: ~p~n", [BadBytes, Terms]),
dirty_dump_log1(LH, disk_log:chunk(LH, K)).
-
-read_term_file(File) -> file:consult(File).
-
-write_term_file(File, Terms) ->
- write_file(File, list_to_binary([io_lib:format("~w.~n", [Term]) ||
- Term <- Terms])).
-
-write_file(Path, Data) ->
- write_file(Path, Data, []).
-
-%% write_file/3 and make_binary/1 are both based on corresponding
-%% functions in the kernel/file.erl module of the Erlang R14B02
-%% release, which is licensed under the EPL. That implementation of
-%% write_file/3 does not do an fsync prior to closing the file, hence
-%% the existence of this version. APIs are otherwise identical.
-write_file(Path, Data, Modes) ->
- Modes1 = [binary, write | (Modes -- [binary, write])],
- case make_binary(Data) of
- Bin when is_binary(Bin) ->
- case file:open(Path, Modes1) of
- {ok, Hdl} -> try file:write(Hdl, Bin) of
- ok -> file:sync(Hdl);
- {error, _} = E -> E
- after
- file:close(Hdl)
- end;
- {error, _} = E -> E
- end;
- {error, _} = E -> E
- end.
-
-make_binary(Bin) when is_binary(Bin) ->
- Bin;
-make_binary(List) ->
- try
- iolist_to_binary(List)
- catch error:Reason ->
- {error, Reason}
- end.
-
-
-append_file(File, Suffix) ->
- case file:read_file_info(File) of
- {ok, FInfo} -> append_file(File, FInfo#file_info.size, Suffix);
- {error, enoent} -> append_file(File, 0, Suffix);
- Error -> Error
- end.
-
-append_file(_, _, "") ->
- ok;
-append_file(File, 0, Suffix) ->
- case file:open([File, Suffix], [append]) of
- {ok, Fd} -> file:close(Fd);
- Error -> Error
- end;
-append_file(File, _, Suffix) ->
- case file:read_file(File) of
- {ok, Data} -> write_file([File, Suffix], Data, [append]);
- Error -> Error
- end.
-
-ensure_parent_dirs_exist(Filename) ->
- case filelib:ensure_dir(Filename) of
- ok -> ok;
- {error, Reason} ->
- throw({error, {cannot_create_parent_dirs, Filename, Reason}})
- end.
-
format_stderr(Fmt, Args) ->
case os:type() of
{unix, _} ->
@@ -619,6 +533,12 @@ with_local_io(Fun) ->
group_leader(GL, self())
end.
+%% Log an info message on the local node using the standard logger.
+%% Use this if rabbit isn't running and the call didn't originate on
+%% the local node (e.g. rabbitmqctl calls).
+local_info_msg(Format, Args) ->
+ with_local_io(fun () -> error_logger:info_msg(Format, Args) end).
+
manage_applications(Iterate, Do, Undo, SkipError, ErrorTag, Apps) ->
Iterate(fun (App, Acc) ->
case Do(App) of
@@ -743,73 +663,6 @@ version_compare(A, B) ->
dropdot(A) -> lists:dropwhile(fun (X) -> X =:= $. end, A).
-recursive_delete(Files) ->
- lists:foldl(fun (Path, ok ) -> recursive_delete1(Path);
- (_Path, {error, _Err} = Error) -> Error
- end, ok, Files).
-
-recursive_delete1(Path) ->
- case filelib:is_dir(Path) and not(is_symlink(Path)) of
- false -> case file:delete(Path) of
- ok -> ok;
- {error, enoent} -> ok; %% Path doesn't exist anyway
- {error, Err} -> {error, {Path, Err}}
- end;
- true -> case file:list_dir(Path) of
- {ok, FileNames} ->
- case lists:foldl(
- fun (FileName, ok) ->
- recursive_delete1(
- filename:join(Path, FileName));
- (_FileName, Error) ->
- Error
- end, ok, FileNames) of
- ok ->
- case file:del_dir(Path) of
- ok -> ok;
- {error, Err} -> {error, {Path, Err}}
- end;
- {error, _Err} = Error ->
- Error
- end;
- {error, Err} ->
- {error, {Path, Err}}
- end
- end.
-
-is_symlink(Name) ->
- case file:read_link(Name) of
- {ok, _} -> true;
- _ -> false
- end.
-
-recursive_copy(Src, Dest) ->
- case filelib:is_dir(Src) of
- false -> case file:copy(Src, Dest) of
- {ok, _Bytes} -> ok;
- {error, enoent} -> ok; %% Path doesn't exist anyway
- {error, Err} -> {error, {Src, Dest, Err}}
- end;
- true -> case file:list_dir(Src) of
- {ok, FileNames} ->
- case file:make_dir(Dest) of
- ok ->
- lists:foldl(
- fun (FileName, ok) ->
- recursive_copy(
- filename:join(Src, FileName),
- filename:join(Dest, FileName));
- (_FileName, Error) ->
- Error
- end, ok, FileNames);
- {error, Err} ->
- {error, {Src, Dest, Err}}
- end;
- {error, Err} ->
- {error, {Src, Dest, Err}}
- end
- end.
-
dict_cons(Key, Value, Dict) ->
dict:update(Key, fun (List) -> [Value | List] end, [Value], Dict).
@@ -899,15 +752,6 @@ build_acyclic_graph(VertexFun, EdgeFun, Graph) ->
{error, Reason}
end.
-%% TODO: When we stop supporting Erlang prior to R14, this should be
-%% replaced with file:open [write, exclusive]
-lock_file(Path) ->
- case filelib:is_file(Path) of
- true -> {error, eexist};
- false -> {ok, Lock} = file:open(Path, [write]),
- ok = file:close(Lock)
- end.
-
const_ok() -> ok.
const(X) -> fun () -> X end.
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index 665b15c553..c8c18843a8 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -121,6 +121,10 @@ force_cluster(ClusterNodes) ->
%% node. If Force is false, only connections to online nodes are
%% allowed.
cluster(ClusterNodes, Force) ->
+ rabbit_misc:local_info_msg("Clustering with ~p~s~n",
+ [ClusterNodes, if Force -> " forcefully";
+ true -> ""
+ end]),
ensure_mnesia_not_running(),
ensure_mnesia_dir(),
@@ -434,7 +438,7 @@ cluster_nodes_config_filename() ->
create_cluster_nodes_config(ClusterNodes) ->
FileName = cluster_nodes_config_filename(),
- case rabbit_misc:write_term_file(FileName, [ClusterNodes]) of
+ case rabbit_file:write_term_file(FileName, [ClusterNodes]) of
ok -> ok;
{error, Reason} ->
throw({error, {cannot_create_cluster_nodes_config,
@@ -443,7 +447,7 @@ create_cluster_nodes_config(ClusterNodes) ->
read_cluster_nodes_config() ->
FileName = cluster_nodes_config_filename(),
- case rabbit_misc:read_term_file(FileName) of
+ case rabbit_file:read_term_file(FileName) of
{ok, [ClusterNodes]} -> ClusterNodes;
{error, enoent} ->
{ok, ClusterNodes} = application:get_env(rabbit, cluster_nodes),
@@ -471,12 +475,12 @@ record_running_nodes() ->
Nodes = running_clustered_nodes() -- [node()],
%% Don't check the result: we're shutting down anyway and this is
%% a best-effort-basis.
- rabbit_misc:write_term_file(FileName, [Nodes]),
+ rabbit_file:write_term_file(FileName, [Nodes]),
ok.
read_previously_running_nodes() ->
FileName = running_nodes_filename(),
- case rabbit_misc:read_term_file(FileName) of
+ case rabbit_file:read_term_file(FileName) of
{ok, [Nodes]} -> Nodes;
{error, enoent} -> [];
{error, Reason} -> throw({error, {cannot_read_previous_nodes_file,
@@ -638,7 +642,7 @@ move_db() ->
copy_db(Destination) ->
ok = ensure_mnesia_not_running(),
- rabbit_misc:recursive_copy(dir(), Destination).
+ rabbit_file:recursive_copy(dir(), Destination).
create_tables() -> create_tables(disc).
@@ -718,6 +722,9 @@ wait_for_tables(TableNames) ->
end.
reset(Force) ->
+ rabbit_misc:local_info_msg("Resetting Rabbit~s~n", [if Force -> " forcefully";
+ true -> ""
+ end]),
ensure_mnesia_not_running(),
case not Force andalso is_clustered() andalso
is_only_disc_node(node(), false)
@@ -745,7 +752,7 @@ reset(Force) ->
end,
ok = delete_cluster_nodes_config(),
%% remove persisted messages and any other garbage we find
- ok = rabbit_misc:recursive_delete(filelib:wildcard(dir() ++ "/*")),
+ ok = rabbit_file:recursive_delete(filelib:wildcard(dir() ++ "/*")),
ok.
leave_cluster([], _) -> ok;
@@ -778,19 +785,13 @@ wait_for(Condition) ->
on_node_up(Node) ->
case is_only_disc_node(Node, true) of
- true -> rabbit_misc:with_local_io(
- fun () -> rabbit_log:info("cluster contains disc "
- "nodes again~n")
- end);
+ true -> rabbit_log:info("cluster contains disc nodes again~n");
false -> ok
end.
on_node_down(Node) ->
case is_only_disc_node(Node, true) of
- true -> rabbit_misc:with_local_io(
- fun () -> rabbit_log:info("only running disc node "
- "went down~n")
- end);
+ true -> rabbit_log:info("only running disc node went down~n");
false -> ok
end.
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index cc12eb5dc3..fc3cbebd4e 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -589,7 +589,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
AttemptFileSummaryRecovery =
case ClientRefs of
- undefined -> ok = rabbit_misc:recursive_delete([Dir]),
+ undefined -> ok = rabbit_file:recursive_delete([Dir]),
ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
false;
_ -> ok = filelib:ensure_dir(filename:join(Dir, "nothing")),
@@ -1340,11 +1340,11 @@ recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
end.
store_recovery_terms(Terms, Dir) ->
- rabbit_misc:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
+ rabbit_file:write_term_file(filename:join(Dir, ?CLEAN_FILENAME), Terms).
read_recovery_terms(Dir) ->
Path = filename:join(Dir, ?CLEAN_FILENAME),
- case rabbit_misc:read_term_file(Path) of
+ case rabbit_file:read_term_file(Path) of
{ok, Terms} -> case file:delete(Path) of
ok -> {true, Terms};
{error, Error} -> {false, Error}
@@ -1901,7 +1901,7 @@ transform_dir(BaseDir, Store, TransformFun) ->
end.
transform_msg_file(FileOld, FileNew, TransformFun) ->
- ok = rabbit_misc:ensure_parent_dirs_exist(FileNew),
+ ok = rabbit_file:ensure_parent_dirs_exist(FileNew),
{ok, RefOld} = file_handle_cache:open(FileOld, [raw, binary, read], []),
{ok, RefNew} = file_handle_cache:open(FileNew, [raw, binary, write],
[{write_buffer,
diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl
index 31f476fc10..2c0912dfd6 100644
--- a/src/rabbit_networking.erl
+++ b/src/rabbit_networking.erl
@@ -320,6 +320,7 @@ connection_info_all() -> cmap(fun (Q) -> connection_info(Q) end).
connection_info_all(Items) -> cmap(fun (Q) -> connection_info(Q, Items) end).
close_connection(Pid, Explanation) ->
+ rabbit_log:info("Closing connection ~p because ~p~n", [Pid, Explanation]),
case lists:member(Pid, connections()) of
true -> rabbit_reader:shutdown(Pid, Explanation);
false -> throw({error, {not_a_connection_pid, Pid}})
diff --git a/src/rabbit_prelaunch.erl b/src/rabbit_prelaunch.erl
index 9fe073d997..cd0c322b6d 100644
--- a/src/rabbit_prelaunch.erl
+++ b/src/rabbit_prelaunch.erl
@@ -70,7 +70,7 @@ start() ->
AppVersions},
%% Write it out to $RABBITMQ_PLUGINS_EXPAND_DIR/rabbit.rel
- rabbit_misc:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
+ rabbit_file:write_file(RootName ++ ".rel", io_lib:format("~p.~n", [RDesc])),
%% We exclude mochiweb due to its optional use of fdsrv.
XRefExclude = [mochiweb],
@@ -139,7 +139,7 @@ determine_version(App) ->
{App, Vsn}.
delete_recursively(Fn) ->
- case rabbit_misc:recursive_delete([Fn]) of
+ case rabbit_file:recursive_delete([Fn]) of
ok -> ok;
{error, {Path, E}} -> {error, {cannot_delete, Path, E}};
Error -> Error
diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl
index 636913b5c8..f1751e9515 100644
--- a/src/rabbit_queue_index.erl
+++ b/src/rabbit_queue_index.erl
@@ -229,7 +229,7 @@
init(Name, OnSyncFun) ->
State = #qistate { dir = Dir } = blank_state(Name),
- false = filelib:is_file(Dir), %% is_file == is file or dir
+ false = rabbit_file:is_file(Dir), %% is_file == is file or dir
State #qistate { on_sync = OnSyncFun }.
shutdown_terms(Name) ->
@@ -256,7 +256,7 @@ terminate(Terms, State) ->
delete_and_terminate(State) ->
{_SegmentCounts, State1 = #qistate { dir = Dir }} = terminate(State),
- ok = rabbit_misc:recursive_delete([Dir]),
+ ok = rabbit_file:recursive_delete([Dir]),
State1.
publish(MsgId, SeqId, MsgProps, IsPersistent,
@@ -359,16 +359,16 @@ recover(DurableQueues) ->
{[dict:fetch(QueueDirName, DurableDict) | DurableAcc],
TermsAcc1};
false ->
- ok = rabbit_misc:recursive_delete([QueueDirPath]),
+ ok = rabbit_file:recursive_delete([QueueDirPath]),
{DurableAcc, TermsAcc}
end
end, {[], []}, QueueDirNames),
{DurableTerms, {fun queue_index_walker/1, {start, DurableQueueNames}}}.
all_queue_directory_names(Dir) ->
- case file:list_dir(Dir) of
+ case rabbit_file:list_dir(Dir) of
{ok, Entries} -> [ Entry || Entry <- Entries,
- filelib:is_dir(
+ rabbit_file:is_dir(
filename:join(Dir, Entry)) ];
{error, enoent} -> []
end.
@@ -392,18 +392,18 @@ blank_state(QueueName) ->
clean_file_name(Dir) -> filename:join(Dir, ?CLEAN_FILENAME).
detect_clean_shutdown(Dir) ->
- case file:delete(clean_file_name(Dir)) of
+ case rabbit_file:delete(clean_file_name(Dir)) of
ok -> true;
{error, enoent} -> false
end.
read_shutdown_terms(Dir) ->
- rabbit_misc:read_term_file(clean_file_name(Dir)).
+ rabbit_file:read_term_file(clean_file_name(Dir)).
store_clean_shutdown(Terms, Dir) ->
CleanFileName = clean_file_name(Dir),
- ok = filelib:ensure_dir(CleanFileName),
- rabbit_misc:write_term_file(CleanFileName, Terms).
+ ok = rabbit_file:ensure_dir(CleanFileName),
+ rabbit_file:write_term_file(CleanFileName, Terms).
init_clean(RecoveredCounts, State) ->
%% Load the journal. Since this is a clean recovery this (almost)
@@ -603,8 +603,8 @@ flush_journal(State = #qistate { segments = Segments }) ->
Segments1 =
segment_fold(
fun (#segment { unacked = 0, path = Path }, SegmentsN) ->
- case filelib:is_file(Path) of
- true -> ok = file:delete(Path);
+ case rabbit_file:is_file(Path) of
+ true -> ok = rabbit_file:delete(Path);
false -> ok
end,
SegmentsN;
@@ -630,7 +630,7 @@ append_journal_to_segment(#segment { journal_entries = JEntries,
get_journal_handle(State = #qistate { journal_handle = undefined,
dir = Dir }) ->
Path = filename:join(Dir, ?JOURNAL_FILENAME),
- ok = filelib:ensure_dir(Path),
+ ok = rabbit_file:ensure_dir(Path),
{ok, Hdl} = file_handle_cache:open(Path, ?WRITE_MODE,
[{write_buffer, infinity}]),
{Hdl, State #qistate { journal_handle = Hdl }};
@@ -735,7 +735,7 @@ all_segment_nums(#qistate { dir = Dir, segments = Segments }) ->
lists:takewhile(fun (C) -> $0 =< C andalso C =< $9 end,
SegName)), Set)
end, sets:from_list(segment_nums(Segments)),
- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)))).
+ rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)))).
segment_find_or_new(Seg, Dir, Segments) ->
case segment_find(Seg, Segments) of
@@ -836,7 +836,7 @@ segment_entries_foldr(Fun, Init,
%%
%% Does not do any combining with the journal at all.
load_segment(KeepAcked, #segment { path = Path }) ->
- case filelib:is_file(Path) of
+ case rabbit_file:is_file(Path) of
false -> {array_new(), 0};
true -> {ok, Hdl} = file_handle_cache:open(Path, ?READ_AHEAD_MODE, []),
{ok, 0} = file_handle_cache:position(Hdl, bof),
@@ -1040,12 +1040,12 @@ foreach_queue_index(Funs) ->
transform_queue(Dir, Gatherer, {JournalFun, SegmentFun}) ->
ok = transform_file(filename:join(Dir, ?JOURNAL_FILENAME), JournalFun),
[ok = transform_file(filename:join(Dir, Seg), SegmentFun)
- || Seg <- filelib:wildcard("*" ++ ?SEGMENT_EXTENSION, Dir)],
+ || Seg <- rabbit_file:wildcard(".*\\" ++ ?SEGMENT_EXTENSION, Dir)],
ok = gatherer:finish(Gatherer).
transform_file(Path, Fun) ->
PathTmp = Path ++ ".upgrade",
- case filelib:file_size(Path) of
+ case rabbit_file:file_size(Path) of
0 -> ok;
Size -> {ok, PathTmpHdl} =
file_handle_cache:open(PathTmp, ?WRITE_MODE,
@@ -1059,7 +1059,7 @@ transform_file(Path, Fun) ->
ok = drive_transform_fun(Fun, PathTmpHdl, Content),
ok = file_handle_cache:close(PathTmpHdl),
- ok = file:rename(PathTmp, Path)
+ ok = rabbit_file:rename(PathTmp, Path)
end.
drive_transform_fun(Fun, Hdl, Contents) ->
diff --git a/src/rabbit_sasl_report_file_h.erl b/src/rabbit_sasl_report_file_h.erl
index 6f3c5c75bc..963294d924 100644
--- a/src/rabbit_sasl_report_file_h.erl
+++ b/src/rabbit_sasl_report_file_h.erl
@@ -26,12 +26,17 @@
%% with the result of closing the old handler when swapping handlers.
%% The first init/1 additionally allows for simple log rotation
%% when the suffix is not the empty string.
+%% The original init/1 also opened the file in 'write' mode, thus
+%% overwriting old logs. To remedy this, init/1 from
+%% lib/sasl/src/sasl_report_file_h.erl from R14B3 was copied as
+%% init_file/1 and changed so that it opens the file in 'append' mode.
%% Used only when swapping handlers and performing
%% log rotation
init({{File, Suffix}, []}) ->
- case rabbit_misc:append_file(File, Suffix) of
- ok -> ok;
+ case rabbit_file:append_file(File, Suffix) of
+ ok -> file:delete(File),
+ ok;
{error, Error} ->
rabbit_log:error("Failed to append contents of "
"sasl log file '~s' to '~s':~n~p~n",
@@ -47,11 +52,18 @@ init({{File, _}, error}) ->
init({File, []}) ->
init(File);
init({File, _Type} = FileInfo) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- sasl_report_file_h:init(FileInfo);
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file(FileInfo);
init(File) ->
- rabbit_misc:ensure_parent_dirs_exist(File),
- sasl_report_file_h:init({File, sasl_error_logger_type()}).
+ rabbit_file:ensure_parent_dirs_exist(File),
+ init_file({File, sasl_error_logger_type()}).
+
+init_file({File, Type}) ->
+ process_flag(trap_exit, true),
+ case file:open(File, [append]) of
+ {ok,Fd} -> {ok, {Fd, File, Type}};
+ Error -> Error
+ end.
handle_event(Event, State) ->
sasl_report_file_h:handle_event(Event, State).
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 7e84251fc4..44c1337678 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -805,23 +805,11 @@ test_log_management() ->
ok = control_action(rotate_logs, []),
ok = test_logs_working(MainLog, SaslLog),
- %% log rotation on empty file
+ %% log rotation on empty files (the main log will have a ctl action logged)
ok = clean_logs([MainLog, SaslLog], Suffix),
ok = control_action(rotate_logs, []),
ok = control_action(rotate_logs, [Suffix]),
- [true, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
-
- %% original main log file is not writable
- ok = make_files_non_writable([MainLog]),
- {error, {cannot_rotate_main_logs, _}} = control_action(rotate_logs, []),
- ok = clean_logs([MainLog], Suffix),
- ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog}]),
-
- %% original sasl log file is not writable
- ok = make_files_non_writable([SaslLog]),
- {error, {cannot_rotate_sasl_logs, _}} = control_action(rotate_logs, []),
- ok = clean_logs([SaslLog], Suffix),
- ok = add_log_handlers([{rabbit_sasl_report_file_h, SaslLog}]),
+ [false, true] = empty_files([[MainLog, Suffix], [SaslLog, Suffix]]),
%% logs with suffix are not writable
ok = control_action(rotate_logs, [Suffix]),
@@ -829,27 +817,28 @@ test_log_management() ->
ok = control_action(rotate_logs, [Suffix]),
ok = test_logs_working(MainLog, SaslLog),
- %% original log files are not writable
+ %% rotate when original log files are not writable
ok = make_files_non_writable([MainLog, SaslLog]),
- {error, {{cannot_rotate_main_logs, _},
- {cannot_rotate_sasl_logs, _}}} = control_action(rotate_logs, []),
+ ok = control_action(rotate_logs, []),
- %% logging directed to tty (handlers were removed in last test)
+ %% logging directed to tty (first, remove handlers)
+ ok = delete_log_handlers([rabbit_sasl_report_file_h,
+ rabbit_error_logger_file_h]),
ok = clean_logs([MainLog, SaslLog], Suffix),
- ok = application:set_env(sasl, sasl_error_logger, tty),
- ok = application:set_env(kernel, error_logger, tty),
+ ok = application:set_env(rabbit, sasl_error_logger, tty),
+ ok = application:set_env(rabbit, error_logger, tty),
ok = control_action(rotate_logs, []),
[{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
%% rotate logs when logging is turned off
- ok = application:set_env(sasl, sasl_error_logger, false),
- ok = application:set_env(kernel, error_logger, silent),
+ ok = application:set_env(rabbit, sasl_error_logger, false),
+ ok = application:set_env(rabbit, error_logger, silent),
ok = control_action(rotate_logs, []),
[{error, enoent}, {error, enoent}] = empty_files([MainLog, SaslLog]),
%% cleanup
- ok = application:set_env(sasl, sasl_error_logger, {file, SaslLog}),
- ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}),
+ ok = application:set_env(rabbit, error_logger, {file, MainLog}),
ok = add_log_handlers([{rabbit_error_logger_file_h, MainLog},
{rabbit_sasl_report_file_h, SaslLog}]),
passed.
@@ -860,8 +849,8 @@ test_log_management_during_startup() ->
%% start application with simple tty logging
ok = control_action(stop_app, []),
- ok = application:set_env(kernel, error_logger, tty),
- ok = application:set_env(sasl, sasl_error_logger, tty),
+ ok = application:set_env(rabbit, error_logger, tty),
+ ok = application:set_env(rabbit, sasl_error_logger, tty),
ok = add_log_handlers([{error_logger_tty_h, []},
{sasl_report_tty_h, []}]),
ok = control_action(start_app, []),
@@ -878,13 +867,12 @@ test_log_management_during_startup() ->
end,
%% fix sasl logging
- ok = application:set_env(sasl, sasl_error_logger,
- {file, SaslLog}),
+ ok = application:set_env(rabbit, sasl_error_logger, {file, SaslLog}),
%% start application with logging to non-existing directory
TmpLog = "/tmp/rabbit-tests/test.log",
delete_file(TmpLog),
- ok = application:set_env(kernel, error_logger, {file, TmpLog}),
+ ok = application:set_env(rabbit, error_logger, {file, TmpLog}),
ok = delete_log_handlers([rabbit_error_logger_file_h]),
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
@@ -905,7 +893,7 @@ test_log_management_during_startup() ->
%% start application with logging to a subdirectory which
%% parent directory has no write permissions
TmpTestDir = "/tmp/rabbit-tests/no-permission/test/log",
- ok = application:set_env(kernel, error_logger, {file, TmpTestDir}),
+ ok = application:set_env(rabbit, error_logger, {file, TmpTestDir}),
ok = add_log_handlers([{error_logger_file_h, MainLog}]),
ok = case control_action(start_app, []) of
ok -> exit({got_success_but_expected_failure,
@@ -920,7 +908,7 @@ test_log_management_during_startup() ->
%% start application with standard error_logger_file_h
%% handler not installed
- ok = application:set_env(kernel, error_logger, {file, MainLog}),
+ ok = application:set_env(rabbit, error_logger, {file, MainLog}),
ok = control_action(start_app, []),
ok = control_action(stop_app, []),
@@ -1765,7 +1753,11 @@ test_file_handle_cache() ->
[filename:join(TmpDir, Str) || Str <- ["file1", "file2", "file3", "file4"]],
Content = <<"foo">>,
CopyFun = fun (Src, Dst) ->
- ok = rabbit_misc:write_file(Src, Content),
+ {ok, Hdl} = prim_file:open(Src, [binary, write]),
+ ok = prim_file:write(Hdl, Content),
+ ok = prim_file:sync(Hdl),
+ prim_file:close(Hdl),
+
{ok, SrcHdl} = file_handle_cache:open(Src, [read], []),
{ok, DstHdl} = file_handle_cache:open(Dst, [write], []),
Size = size(Content),
diff --git a/src/rabbit_trace.erl b/src/rabbit_trace.erl
index f9632324be..58079ccf47 100644
--- a/src/rabbit_trace.erl
+++ b/src/rabbit_trace.erl
@@ -67,9 +67,11 @@ tap_trace_out({#resource{name = QName}, _QPid, _QMsgId, Redelivered, Msg},
%%----------------------------------------------------------------------------
start(VHost) ->
+ rabbit_log:info("Enabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> [VHost | VHosts -- [VHost]] end).
stop(VHost) ->
+ rabbit_log:info("Disabling tracing for vhost '~s'~n", [VHost]),
update_config(fun (VHosts) -> VHosts -- [VHost] end).
update_config(Fun) ->
diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl
index e7a302f80d..717d94a802 100644
--- a/src/rabbit_upgrade.erl
+++ b/src/rabbit_upgrade.erl
@@ -115,7 +115,7 @@ ensure_backup_removed() ->
end.
remove_backup() ->
- ok = rabbit_misc:recursive_delete([backup_dir()]),
+ ok = rabbit_file:recursive_delete([backup_dir()]),
info("upgrades: Mnesia backup removed~n", []).
maybe_upgrade_mnesia() ->
@@ -249,7 +249,7 @@ maybe_upgrade_local() ->
%% -------------------------------------------------------------------
apply_upgrades(Scope, Upgrades, Fun) ->
- ok = rabbit_misc:lock_file(lock_filename()),
+ ok = rabbit_file:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
Fun(),
diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl
index 400abc1083..f6bcbb7fd5 100644
--- a/src/rabbit_version.erl
+++ b/src/rabbit_version.erl
@@ -49,12 +49,12 @@
%% -------------------------------------------------------------------
-recorded() -> case rabbit_misc:read_term_file(schema_filename()) of
+recorded() -> case rabbit_file:read_term_file(schema_filename()) of
{ok, [V]} -> {ok, V};
{error, _} = Err -> Err
end.
-record(V) -> ok = rabbit_misc:write_term_file(schema_filename(), [V]).
+record(V) -> ok = rabbit_file:write_term_file(schema_filename(), [V]).
recorded_for_scope(Scope) ->
case recorded() of
diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl
index 08d6c99a9c..38bb76b03b 100644
--- a/src/rabbit_vhost.erl
+++ b/src/rabbit_vhost.erl
@@ -44,6 +44,7 @@
-define(INFO_KEYS, [name, tracing]).
add(VHostPath) ->
+ rabbit_log:info("Adding vhost '~s'~n", [VHostPath]),
R = rabbit_misc:execute_mnesia_transaction(
fun () ->
case mnesia:wread({rabbit_vhost, VHostPath}) of
@@ -69,7 +70,6 @@ add(VHostPath) ->
{<<"amq.rabbitmq.trace">>, topic}]],
ok
end),
- rabbit_log:info("Added vhost ~p~n", [VHostPath]),
R.
delete(VHostPath) ->
@@ -78,6 +78,7 @@ delete(VHostPath) ->
%% process, which in turn results in further mnesia actions and
%% eventually the termination of that process. Exchange deletion causes
%% notifications which must be sent outside the TX
+ rabbit_log:info("Deleting vhost '~s'~n", [VHostPath]),
[{ok,_} = rabbit_amqqueue:delete(Q, false, false) ||
Q <- rabbit_amqqueue:list(VHostPath)],
[ok = rabbit_exchange:delete(Name, false) ||
@@ -86,7 +87,6 @@ delete(VHostPath) ->
with(VHostPath, fun () ->
ok = internal_delete(VHostPath)
end)),
- rabbit_log:info("Deleted vhost ~p~n", [VHostPath]),
R.
internal_delete(VHostPath) ->