summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthias Radestock <matthias@lshift.net>2010-04-26 21:05:43 +0100
committerMatthias Radestock <matthias@lshift.net>2010-04-26 21:05:43 +0100
commit1a9e37fbe7576b5312192f3eeecbb3fccef10507 (patch)
tree976af0d2a2006114ea3768eeaf42233bb18812eb /src
parent7b3fd92c552d9b114c25f7f8077185fcfe92c2ea (diff)
parent2abce2db6cb2af6f0f55efdc8f409f8b7a0470e5 (diff)
downloadrabbitmq-server-git-1a9e37fbe7576b5312192f3eeecbb3fccef10507.tar.gz
merge default into bug22616
Diffstat (limited to 'src')
-rw-r--r--src/gen_server2.erl6
-rw-r--r--src/rabbit.erl14
-rw-r--r--src/rabbit_binary_generator.erl53
-rw-r--r--src/rabbit_guid.erl14
-rw-r--r--src/rabbit_misc.erl6
-rw-r--r--src/rabbit_persister.erl169
-rw-r--r--src/rabbit_tests.erl4
7 files changed, 138 insertions, 128 deletions
diff --git a/src/gen_server2.erl b/src/gen_server2.erl
index c33582e30d..94a23fb90a 100644
--- a/src/gen_server2.erl
+++ b/src/gen_server2.erl
@@ -607,9 +607,9 @@ process_msg(Parent, Name, State, Mod, Time, TimeoutState, Queue,
Debug, Msg) ->
case Msg of
{system, From, Req} ->
- sys:handle_system_msg
- (Req, From, Parent, ?MODULE, Debug,
- [Name, State, Mod, Time, TimeoutState, Queue]);
+ sys:handle_system_msg(
+ Req, From, Parent, ?MODULE, Debug,
+ [Name, State, Mod, Time, TimeoutState, Queue]);
%% gen_server puts Hib on the end as the 7th arg, but that
%% version of the function seems not to be documented so
%% leaving out for now.
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 733a7a88cd..584a33ab5c 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -98,6 +98,13 @@
{requires, rabbit_alarm},
{enables, core_initialized}]}).
+-rabbit_boot_step({guid_generator,
+ [{description, "guid generator"},
+ {mfa, {rabbit_sup, start_restartable_child,
+ [rabbit_guid]}},
+ {requires, kernel_ready},
+ {enables, core_initialized}]}).
+
-rabbit_boot_step({rabbit_router,
[{description, "cluster router"},
{mfa, {rabbit_sup, start_restartable_child,
@@ -135,13 +142,6 @@
[rabbit_persister]}},
{requires, queue_sup_queue_recovery}]}).
--rabbit_boot_step({guid_generator,
- [{description, "guid generator"},
- {mfa, {rabbit_sup, start_restartable_child,
- [rabbit_guid]}},
- {requires, persister},
- {enables, routing_ready}]}).
-
-rabbit_boot_step({routing_ready,
[{description, "message delivery logic ready"}]}).
diff --git a/src/rabbit_binary_generator.erl b/src/rabbit_binary_generator.erl
index 1d47d7640e..ed84373585 100644
--- a/src/rabbit_binary_generator.erl
+++ b/src/rabbit_binary_generator.erl
@@ -95,33 +95,36 @@ maybe_encode_properties(_ContentProperties, ContentPropertiesBin)
maybe_encode_properties(ContentProperties, none) ->
rabbit_framing:encode_properties(ContentProperties).
-build_content_frames(FragmentsRev, FrameMax, ChannelInt) ->
- BodyPayloadMax = if
- FrameMax == 0 ->
- none;
- true ->
+build_content_frames(FragsRev, FrameMax, ChannelInt) ->
+ BodyPayloadMax = if FrameMax == 0 ->
+ iolist_size(FragsRev);
+ true ->
FrameMax - ?EMPTY_CONTENT_BODY_FRAME_SIZE
end,
- build_content_frames(0, [], FragmentsRev, BodyPayloadMax, ChannelInt).
-
-build_content_frames(SizeAcc, FragmentAcc, [], _BodyPayloadMax, _ChannelInt) ->
- {SizeAcc, FragmentAcc};
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt)
- when is_number(BodyPayloadMax) and (size(Fragment) > BodyPayloadMax) ->
- <<Head:BodyPayloadMax/binary, Tail/binary>> = Fragment,
- build_content_frames(SizeAcc, FragmentAcc, [Tail, Head | FragmentsRev],
- BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [<<>> | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc, FragmentAcc, FragmentsRev, BodyPayloadMax, ChannelInt);
-build_content_frames(SizeAcc, FragmentAcc, [Fragment | FragmentsRev],
- BodyPayloadMax, ChannelInt) ->
- build_content_frames(SizeAcc + size(Fragment),
- [create_frame(3, ChannelInt, Fragment) | FragmentAcc],
- FragmentsRev,
- BodyPayloadMax,
- ChannelInt).
+ build_content_frames(0, [], BodyPayloadMax, [],
+ lists:reverse(FragsRev), BodyPayloadMax, ChannelInt).
+
+build_content_frames(SizeAcc, FramesAcc, _FragSizeRem, [],
+ [], _BodyPayloadMax, _ChannelInt) ->
+ {SizeAcc, lists:reverse(FramesAcc)};
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ Frags, BodyPayloadMax, ChannelInt)
+ when FragSizeRem == 0 orelse Frags == [] ->
+ Frame = create_frame(3, ChannelInt, lists:reverse(FragAcc)),
+ FrameSize = BodyPayloadMax - FragSizeRem,
+ build_content_frames(SizeAcc + FrameSize, [Frame | FramesAcc],
+ BodyPayloadMax, [], Frags, BodyPayloadMax, ChannelInt);
+build_content_frames(SizeAcc, FramesAcc, FragSizeRem, FragAcc,
+ [Frag | Frags], BodyPayloadMax, ChannelInt) ->
+ Size = size(Frag),
+ {NewFragSizeRem, NewFragAcc, NewFrags} =
+ case Size =< FragSizeRem of
+ true -> {FragSizeRem - Size, [Frag | FragAcc], Frags};
+ false -> <<Head:FragSizeRem/binary, Tail/binary>> = Frag,
+ {0, [Head | FragAcc], [Tail | Frags]}
+ end,
+ build_content_frames(SizeAcc, FramesAcc, NewFragSizeRem, NewFragAcc,
+ NewFrags, BodyPayloadMax, ChannelInt).
build_heartbeat_frame() ->
create_frame(?FRAME_HEARTBEAT, 0, <<>>).
diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl
index 2fa531a7ce..1ae8f7dac4 100644
--- a/src/rabbit_guid.erl
+++ b/src/rabbit_guid.erl
@@ -67,7 +67,7 @@ update_disk_serial() ->
Filename = filename:join(rabbit_mnesia:dir(), ?SERIAL_FILENAME),
Serial = case rabbit_misc:read_term_file(Filename) of
{ok, [Num]} -> Num;
- {error, enoent} -> rabbit_persister:serial();
+ {error, enoent} -> 0;
{error, Reason} ->
throw({error, {cannot_read_serial_file, Filename, Reason}})
end,
@@ -78,7 +78,7 @@ update_disk_serial() ->
end,
Serial.
-%% generate a guid that is monotonically increasing per process.
+%% generate a GUID.
%%
%% The id is only unique within a single cluster and as long as the
%% serial store hasn't been deleted.
@@ -92,20 +92,18 @@ guid() ->
%% A persisted serial number, in combination with self/0 (which
%% includes the node name) uniquely identifies a process in space
%% and time. We combine that with a process-local counter to give
- %% us a GUID that is monotonically increasing per process.
+ %% us a GUID.
G = case get(guid) of
undefined -> {{gen_server:call(?SERVER, serial, infinity), self()},
0};
{S, I} -> {S, I+1}
end,
put(guid, G),
- G.
+ erlang:md5(term_to_binary(G)).
-%% generate a readable string representation of a guid. Note that any
-%% monotonicity of the guid is not preserved in the encoding.
+%% generate a readable string representation of a GUID.
string_guid(Prefix) ->
- Prefix ++ "-" ++ base64:encode_to_string(
- erlang:md5(term_to_binary(guid()))).
+ Prefix ++ "-" ++ base64:encode_to_string(guid()).
binstring_guid(Prefix) ->
list_to_binary(string_guid(Prefix)).
diff --git a/src/rabbit_misc.erl b/src/rabbit_misc.erl
index 028b0d73ea..2c1808465f 100644
--- a/src/rabbit_misc.erl
+++ b/src/rabbit_misc.erl
@@ -43,6 +43,7 @@
-export([r/3, r/2, r_arg/4, rs/1]).
-export([enable_cover/0, report_cover/0]).
-export([enable_cover/1, report_cover/1]).
+-export([start_cover/1]).
-export([throw_on_error/2, with_exit_handler/2, filter_exit_map/2]).
-export([with_user/2, with_vhost/2, with_user_and_vhost/3]).
-export([execute_mnesia_transaction/1]).
@@ -97,6 +98,7 @@
undefined | r(K) when is_subtype(K, atom())).
-spec(rs/1 :: (r(atom())) -> string()).
-spec(enable_cover/0 :: () -> ok_or_error()).
+-spec(start_cover/1 :: ([{string(), string()} | string()]) -> 'ok').
-spec(report_cover/0 :: () -> 'ok').
-spec(enable_cover/1 :: (file_path()) -> ok_or_error()).
-spec(report_cover/1 :: (file_path()) -> 'ok').
@@ -228,6 +230,10 @@ enable_cover(Root) ->
_ -> ok
end.
+start_cover(NodesS) ->
+ {ok, _} = cover:start([makenode(N) || N <- NodesS]),
+ ok.
+
report_cover() ->
report_cover(".").
diff --git a/src/rabbit_persister.erl b/src/rabbit_persister.erl
index 8aa5ad8d32..a9e0cab928 100644
--- a/src/rabbit_persister.erl
+++ b/src/rabbit_persister.erl
@@ -40,7 +40,7 @@
-export([transaction/1, extend_transaction/2, dirty_work/1,
commit_transaction/1, rollback_transaction/1,
- force_snapshot/0, serial/0]).
+ force_snapshot/0]).
-include("rabbit.hrl").
@@ -49,11 +49,7 @@
-define(LOG_BUNDLE_DELAY, 5).
-define(COMPLETE_BUNDLE_DELAY, 2).
--define(HIBERNATE_AFTER, 10000).
-
--define(MAX_WRAP_ENTRIES, 500).
-
--define(PERSISTER_LOG_FORMAT_VERSION, {2, 4}).
+-define(PERSISTER_LOG_FORMAT_VERSION, {2, 6}).
-record(pstate, {log_handle, entry_count, deadline,
pending_logs, pending_replies,
@@ -64,7 +60,7 @@
%% the other maps a key to one or more queues.
%% The aim is to reduce the overload of storing a message multiple times
%% when it appears in several queues.
--record(psnapshot, {serial, transactions, messages, queues}).
+-record(psnapshot, {transactions, messages, queues, next_seq_id}).
%%----------------------------------------------------------------------------
@@ -78,12 +74,11 @@
-spec(start_link/0 :: () -> {'ok', pid()} | 'ignore' | {'error', any()}).
-spec(transaction/1 :: ([work_item()]) -> 'ok').
--spec(extend_transaction/2 :: (txn(), [work_item()]) -> 'ok').
+-spec(extend_transaction/2 :: ({txn(), queue_name()}, [work_item()]) -> 'ok').
-spec(dirty_work/1 :: ([work_item()]) -> 'ok').
--spec(commit_transaction/1 :: (txn()) -> 'ok').
--spec(rollback_transaction/1 :: (txn()) -> 'ok').
+-spec(commit_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
+-spec(rollback_transaction/1 :: ({txn(), queue_name()}) -> 'ok').
-spec(force_snapshot/0 :: () -> 'ok').
--spec(serial/0 :: () -> non_neg_integer()).
-endif.
@@ -116,19 +111,16 @@ rollback_transaction(TxnKey) ->
force_snapshot() ->
gen_server:call(?SERVER, force_snapshot, infinity).
-serial() ->
- gen_server:call(?SERVER, serial, infinity).
-
%%--------------------------------------------------------------------
init(_Args) ->
process_flag(trap_exit, true),
FileName = base_filename(),
ok = filelib:ensure_dir(FileName),
- Snapshot = #psnapshot{serial = 0,
- transactions = dict:new(),
+ Snapshot = #psnapshot{transactions = dict:new(),
messages = ets:new(messages, []),
- queues = ets:new(queues, [])},
+ queues = ets:new(queues, []),
+ next_seq_id = 0},
LogHandle =
case disk_log:open([{name, rabbit_persister},
{head, current_snapshot(Snapshot)},
@@ -143,9 +135,7 @@ init(_Args) ->
[Recovered, Bad]),
LH
end,
- {Res, LoadedSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
- NewSnapshot = LoadedSnapshot#psnapshot{
- serial = LoadedSnapshot#psnapshot.serial + 1},
+ {Res, NewSnapshot} = internal_load_snapshot(LogHandle, Snapshot),
case Res of
ok ->
ok = take_snapshot(LogHandle, NewSnapshot);
@@ -153,12 +143,12 @@ init(_Args) ->
rabbit_log:error("Failed to load persister log: ~p~n", [Reason]),
ok = take_snapshot_and_save_old(LogHandle, NewSnapshot)
end,
- State = #pstate{log_handle = LogHandle,
- entry_count = 0,
- deadline = infinity,
- pending_logs = [],
+ State = #pstate{log_handle = LogHandle,
+ entry_count = 0,
+ deadline = infinity,
+ pending_logs = [],
pending_replies = [],
- snapshot = NewSnapshot},
+ snapshot = NewSnapshot},
{ok, State}.
handle_call({transaction, Key, MessageList}, From, State) ->
@@ -168,9 +158,6 @@ handle_call({commit_transaction, TxnKey}, From, State) ->
do_noreply(internal_commit(From, TxnKey, State));
handle_call(force_snapshot, _From, State) ->
do_reply(ok, flush(true, State));
-handle_call(serial, _From,
- State = #pstate{snapshot = #psnapshot{serial = Serial}}) ->
- do_reply(Serial, State);
handle_call(_Request, _From, State) ->
{noreply, State}.
@@ -236,8 +223,7 @@ complete(From, Item, State = #pstate{deadline = ExistingDeadline,
%% "tied" is met.
log_work(CreateWorkUnit, MessageList,
State = #pstate{
- snapshot = Snapshot = #psnapshot{
- messages = Messages}}) ->
+ snapshot = Snapshot = #psnapshot{messages = Messages}}) ->
Unit = CreateWorkUnit(
rabbit_misc:map_in_order(
fun(M = {publish, Message, QK = {_QName, PKey}}) ->
@@ -282,12 +268,15 @@ take_snapshot_and_save_old(LogHandle, Snapshot) ->
maybe_take_snapshot(Force, State = #pstate{entry_count = EntryCount,
log_handle = LH,
- snapshot = Snapshot})
- when Force orelse EntryCount >= ?MAX_WRAP_ENTRIES ->
- ok = take_snapshot(LH, Snapshot),
- State#pstate{entry_count = 0};
-maybe_take_snapshot(_Force, State) ->
- State.
+ snapshot = Snapshot}) ->
+ {ok, MaxWrapEntries} = application:get_env(persister_max_wrap_entries),
+ if
+ Force orelse EntryCount >= MaxWrapEntries ->
+ ok = take_snapshot(LH, Snapshot),
+ State#pstate{entry_count = 0};
+ true ->
+ State
+ end.
later_ms(DeltaMilliSec) ->
{MegaSec, Sec, MicroSec} = now(),
@@ -304,7 +293,8 @@ compute_deadline(_TimerDelay, ExistingDeadline) ->
ExistingDeadline.
compute_timeout(infinity) ->
- ?HIBERNATE_AFTER;
+ {ok, HibernateAfter} = application:get_env(persister_hibernate_after),
+ HibernateAfter;
compute_timeout(Deadline) ->
DeltaMilliSec = time_diff(Deadline, now()) * 1000.0,
if
@@ -343,20 +333,20 @@ flush(ForceSnapshot, State = #pstate{pending_logs = PendingLogs,
pending_logs = [],
pending_replies = []}.
-current_snapshot(_Snapshot = #psnapshot{serial = Serial,
- transactions= Ts,
- messages = Messages,
- queues = Queues}) ->
+current_snapshot(_Snapshot = #psnapshot{transactions = Ts,
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = NextSeqId}) ->
%% Avoid infinite growth of the table by removing messages not
%% bound to a queue anymore
prune_table(Messages, ets:foldl(
- fun ({{_QName, PKey}, _Delivered}, S) ->
+ fun ({{_QName, PKey}, _Delivered, _SeqId}, S) ->
sets:add_element(PKey, S)
end, sets:new(), Queues)),
- InnerSnapshot = {{serial, Serial},
- {txns, Ts},
+ InnerSnapshot = {{txns, Ts},
{messages, ets:tab2list(Messages)},
- {queues, ets:tab2list(Queues)}},
+ {queues, ets:tab2list(Queues)},
+ {next_seq_id, NextSeqId}},
?LOGDEBUG("Inner snapshot: ~p~n", [InnerSnapshot]),
{persist_snapshot, {vsn, ?PERSISTER_LOG_FORMAT_VERSION},
term_to_binary(InnerSnapshot)}.
@@ -380,14 +370,14 @@ internal_load_snapshot(LogHandle,
{K, [Loaded_Snapshot | Items]} = disk_log:chunk(LogHandle, start),
case check_version(Loaded_Snapshot) of
{ok, StateBin} ->
- {{serial, Serial}, {txns, Ts}, {messages, Ms}, {queues, Qs}} =
- binary_to_term(StateBin),
+ {{txns, Ts}, {messages, Ms}, {queues, Qs},
+ {next_seq_id, NextSeqId}} = binary_to_term(StateBin),
true = ets:insert(Messages, Ms),
true = ets:insert(Queues, Qs),
Snapshot1 = replay(Items, LogHandle, K,
Snapshot#psnapshot{
- serial = Serial,
- transactions = Ts}),
+ transactions = Ts,
+ next_seq_id = NextSeqId}),
Snapshot2 = requeue_messages(Snapshot1),
%% uncompleted transactions are discarded - this is TRTTD
%% since we only get into this code on node restart, so
@@ -407,8 +397,8 @@ check_version(_Other) ->
requeue_messages(Snapshot = #psnapshot{messages = Messages,
queues = Queues}) ->
Work = ets:foldl(
- fun ({{QName, PKey}, Delivered}, Acc) ->
- rabbit_misc:dict_cons(QName, {PKey, Delivered}, Acc)
+ fun ({{QName, PKey}, Delivered, SeqId}, Acc) ->
+ rabbit_misc:dict_cons(QName, {SeqId, PKey, Delivered}, Acc)
end, dict:new(), Queues),
%% unstable parallel map, because order doesn't matter
L = lists:append(
@@ -419,8 +409,8 @@ requeue_messages(Snapshot = #psnapshot{messages = Messages,
fun ({QName, Requeues}) ->
requeue(QName, Requeues, Messages)
end, dict:to_list(Work))),
- NewMessages = [{K, M} || {{_Q, K}, M, _D} <- L],
- NewQueues = [{QK, D} || {QK, _M, D} <- L],
+ NewMessages = [{K, M} || {_S, _Q, K, M, _D} <- L],
+ NewQueues = [{{Q, K}, D, S} || {S, Q, K, _M, D} <- L],
ets:delete_all_objects(Messages),
ets:delete_all_objects(Queues),
true = ets:insert(Messages, NewMessages),
@@ -432,8 +422,8 @@ requeue(QName, Requeues, Messages) ->
case rabbit_amqqueue:lookup(QName) of
{ok, #amqqueue{pid = QPid}} ->
RequeueMessages =
- [{{QName, PKey}, Message, Delivered} ||
- {PKey, Delivered} <- Requeues,
+ [{SeqId, QName, PKey, Message, Delivered} ||
+ {SeqId, PKey, Delivered} <- Requeues,
{_, Message} <- ets:lookup(Messages, PKey)],
rabbit_amqqueue:redeliver(
QPid,
@@ -443,7 +433,7 @@ requeue(QName, Requeues, Messages) ->
%% per-channel basis, and channels are bound to specific
%% processes, sorting the list does provide the correct
%% ordering properties.
- [{Message, Delivered} || {_, Message, Delivered} <-
+ [{Message, Delivered} || {_, _, _, Message, Delivered} <-
lists:sort(RequeueMessages)]),
RequeueMessages;
{error, not_found} ->
@@ -477,39 +467,48 @@ internal_integrate1({rollback_transaction, Key},
Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
internal_integrate1({commit_transaction, Key},
Snapshot = #psnapshot{transactions = Transactions,
- messages = Messages,
- queues = Queues}) ->
+ messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
case dict:find(Key, Transactions) of
{ok, MessageLists} ->
?LOGDEBUG("persist committing txn ~p~n", [Key]),
- lists:foreach(fun (ML) -> perform_work(ML, Messages, Queues) end,
- lists:reverse(MessageLists)),
- Snapshot#psnapshot{transactions = dict:erase(Key, Transactions)};
+ NextSeqId =
+ lists:foldr(
+ fun (ML, SeqIdN) ->
+ perform_work(ML, Messages, Queues, SeqIdN) end,
+ SeqId, MessageLists),
+ Snapshot#psnapshot{transactions = dict:erase(Key, Transactions),
+ next_seq_id = NextSeqId};
error ->
Snapshot
end;
internal_integrate1({dirty_work, MessageList},
- Snapshot = #psnapshot {messages = Messages,
- queues = Queues}) ->
- perform_work(MessageList, Messages, Queues),
- Snapshot.
-
-perform_work(MessageList, Messages, Queues) ->
- lists:foreach(
- fun (Item) -> perform_work_item(Item, Messages, Queues) end,
- MessageList).
-
-perform_work_item({publish, Message, QK = {_QName, PKey}}, Messages, Queues) ->
- ets:insert(Messages, {PKey, Message}),
- ets:insert(Queues, {QK, false});
-
-perform_work_item({tied, QK}, _Messages, Queues) ->
- ets:insert(Queues, {QK, false});
-
-perform_work_item({deliver, QK}, _Messages, Queues) ->
- %% from R12B-2 onward we could use ets:update_element/3 here
- ets:delete(Queues, QK),
- ets:insert(Queues, {QK, true});
-
-perform_work_item({ack, QK}, _Messages, Queues) ->
- ets:delete(Queues, QK).
+ Snapshot = #psnapshot{messages = Messages,
+ queues = Queues,
+ next_seq_id = SeqId}) ->
+ Snapshot#psnapshot{next_seq_id = perform_work(MessageList, Messages,
+ Queues, SeqId)}.
+
+perform_work(MessageList, Messages, Queues, SeqId) ->
+ lists:foldl(fun (Item, NextSeqId) ->
+ perform_work_item(Item, Messages, Queues, NextSeqId)
+ end, SeqId, MessageList).
+
+perform_work_item({publish, Message, QK = {_QName, PKey}},
+ Messages, Queues, NextSeqId) ->
+ true = ets:insert(Messages, {PKey, Message}),
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({tied, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:insert(Queues, {QK, false, NextSeqId}),
+ NextSeqId + 1;
+
+perform_work_item({deliver, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:update_element(Queues, QK, {2, true}),
+ NextSeqId;
+
+perform_work_item({ack, QK}, _Messages, Queues, NextSeqId) ->
+ true = ets:delete(Queues, QK),
+ NextSeqId.
diff --git a/src/rabbit_tests.erl b/src/rabbit_tests.erl
index 82f2d19918..d645d183d1 100644
--- a/src/rabbit_tests.erl
+++ b/src/rabbit_tests.erl
@@ -625,8 +625,12 @@ test_cluster_management2(SecondaryNode) ->
ok = control_action(stop_app, []),
%% NB: this will log an inconsistent_database error, which is harmless
+ %% Turning cover on / off is OK even if we're not in general using cover,
+ %% it just turns the engine on / off, doesn't actually log anything.
+ cover:stop([SecondaryNode]),
true = disconnect_node(SecondaryNode),
pong = net_adm:ping(SecondaryNode),
+ cover:start([SecondaryNode]),
%% leaving a cluster as a ram node
ok = control_action(reset, []),