summaryrefslogtreecommitdiff
path: root/test
diff options
context:
space:
mode:
authorJean-Sébastien Pédron <jean-sebastien@rabbitmq.com>2016-04-20 16:10:51 +0200
committerJean-Sébastien Pédron <jean-sebastien.pedron@dumbbell.fr>2016-05-27 10:37:28 +0200
commit83643e45cc43a0f852cc60b27ca3e015635854ff (patch)
tree6461b032af83c408d50324f482a56d64b7aa943e /test
parentff951ac32deebd7e5f867459a12a9e3799097c74 (diff)
downloadrabbitmq-server-git-83643e45cc43a0f852cc60b27ca3e015635854ff.tar.gz
Switch testsuite to common_test, part #3
The migrated tests are those from `$(WITH_BROKER_TEST_COMMANDS)` and `$(STANDALONE_TEST_COMMANDS)`. References #725. [#116526487]
Diffstat (limited to 'test')
-rw-r--r--test/channel_operation_timeout_SUITE.erl196
-rw-r--r--test/channel_operation_timeout_test_queue.erl2443
-rw-r--r--test/cluster_rename_SUITE.erl304
-rw-r--r--test/clustering_management_SUITE.erl728
-rw-r--r--test/crashing_queues_SUITE.erl269
-rw-r--r--test/dynamic_ha_SUITE.erl329
-rw-r--r--test/eager_sync_SUITE.erl278
-rw-r--r--test/inet_proxy_dist.erl201
-rw-r--r--test/inet_tcp_proxy.erl134
-rw-r--r--test/inet_tcp_proxy_manager.erl107
-rw-r--r--test/lazy_queue_SUITE.erl224
-rw-r--r--test/many_node_ha_SUITE.erl117
-rw-r--r--test/partitions_SUITE.erl438
-rw-r--r--test/priority_queue_SUITE.erl558
-rw-r--r--test/queue_master_location_SUITE.erl271
-rw-r--r--test/rabbit_ha_test_consumer.erl114
-rw-r--r--test/rabbit_ha_test_producer.erl119
-rw-r--r--test/simple_ha_SUITE.erl216
-rw-r--r--test/sync_detection_SUITE.erl252
19 files changed, 7298 insertions, 0 deletions
diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl
new file mode 100644
index 0000000000..7b41b9c225
--- /dev/null
+++ b/test/channel_operation_timeout_SUITE.erl
@@ -0,0 +1,196 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(channel_operation_timeout_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile([export_all]).
+
+-import(rabbit_misc, [pget/2]).
+
+-define(CONFIG, [cluster_ab]).
+-define(DEFAULT_VHOST, <<"/">>).
+-define(QRESOURCE(Q), rabbit_misc:r(?DEFAULT_VHOST, queue, Q)).
+-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
+-define(DELAY, 25).
+
+all() ->
+ [
+ notify_down_all
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = 2,
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+notify_down_all(Config) ->
+ Rabbit = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ RabbitCh = rabbit_ct_client_helpers:open_channel(Config, 0),
+ HareCh = rabbit_ct_client_helpers:open_channel(Config, 1),
+
+ %% success
+ set_channel_operation_timeout_config(Config, 1000),
+ configure_bq(Config),
+ QCfg0 = qconfig(RabbitCh, <<"q0">>, <<"ex0">>, true, false),
+ declare(QCfg0),
+ %% Testing rabbit_amqqueue:notify_down_all via rabbit_channel.
+ %% Consumer count = 0 after correct channel termination and
+ %% notification of queues via delagate:call/3
+ true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))),
+ rabbit_ct_client_helpers:close_channel(RabbitCh),
+ 0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)),
+ false = is_process_alive(RabbitCh),
+
+ %% fail
+ set_channel_operation_timeout_config(Config, 10),
+ QCfg2 = qconfig(HareCh, <<"q1">>, <<"ex1">>, true, false),
+ declare(QCfg2),
+ publish(QCfg2, ?TIMEOUT_TEST_MSG),
+ timer:sleep(?DELAY),
+ rabbit_ct_client_helpers:close_channel(HareCh),
+ timer:sleep(?DELAY),
+ false = is_process_alive(HareCh),
+
+ pass.
+
+%% -------------------------
+%% Internal helper functions
+%% -------------------------
+
+set_channel_operation_timeout_config(Config, Timeout) ->
+ [ok = Ret
+ || Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
+ application, set_env, [rabbit, channel_operation_timeout, Timeout])],
+ ok.
+
+set_channel_operation_backing_queue(Config) ->
+ [ok = Ret
+ || Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
+ application, set_env,
+ [rabbit, backing_queue_module, channel_operation_timeout_test_queue])],
+ ok.
+
+re_enable_priority_queue(Config) ->
+ [ok = Ret
+ || Ret <- rabbit_ct_broker_helpers:rpc_all(Config,
+ rabbit_priority_queue, enable, [])],
+ ok.
+
+declare(QCfg) ->
+ QDeclare = #'queue.declare'{queue = Q = pget(name, QCfg), durable = true},
+ #'queue.declare_ok'{} = amqp_channel:call(Ch = pget(ch, QCfg), QDeclare),
+
+ ExDeclare = #'exchange.declare'{exchange = Ex = pget(ex, QCfg)},
+ #'exchange.declare_ok'{} = amqp_channel:call(Ch, ExDeclare),
+
+ #'queue.bind_ok'{} =
+ amqp_channel:call(Ch, #'queue.bind'{queue = Q,
+ exchange = Ex,
+ routing_key = Q}),
+ maybe_subscribe(QCfg).
+
+maybe_subscribe(QCfg) ->
+ case pget(consume, QCfg) of
+ true ->
+ Sub = #'basic.consume'{queue = pget(name, QCfg)},
+ Ch = pget(ch, QCfg),
+ Del = pget(deliver, QCfg),
+ amqp_channel:subscribe(Ch, Sub,
+ spawn(fun() -> consume(Ch, Del) end));
+ _ -> ok
+ end.
+
+consume(_Ch, false) -> receive_nothing();
+consume(Ch, Deliver = true) ->
+ receive
+ {#'basic.deliver'{}, _Msg} ->
+ consume(Ch, Deliver)
+ end.
+
+publish(QCfg, Msg) ->
+ Publish = #'basic.publish'{exchange = pget(ex, QCfg),
+ routing_key = pget(name, QCfg)},
+ amqp_channel:call(pget(ch, QCfg), Publish,
+ #amqp_msg{payload = Msg}).
+
+get_consumers(Config, Node, VHost) when is_atom(Node),
+ is_binary(VHost) ->
+ rabbit_ct_broker_helpers:rpc(Config, Node,
+ rabbit_amqqueue, consumers_all, [VHost]).
+
+get_amqqueue(Q, []) -> throw({not_found, Q});
+get_amqqueue(Q, [AMQQ = #amqqueue{name = Q} | _]) -> AMQQ;
+get_amqqueue(Q, [_| Rem]) -> get_amqqueue(Q, Rem).
+
+qconfig(Ch, Name, Ex, Consume, Deliver) ->
+ [{ch, Ch}, {name, Name}, {ex,Ex}, {consume, Consume}, {deliver, Deliver}].
+
+receive_nothing() ->
+ receive
+ after infinity -> void
+ end.
+
+unhandled_req(Fun) ->
+ try
+ Fun()
+ catch
+ exit:{{shutdown,{_, ?NOT_FOUND, _}}, _} -> ok;
+ _:Reason -> {error, Reason}
+ end.
+
+configure_bq(Config) ->
+ ok = set_channel_operation_backing_queue(Config),
+ ok = re_enable_priority_queue(Config),
+ ok = rabbit_ct_broker_helpers:add_code_path_to_all_nodes(Config,
+ ?MODULE).
diff --git a/test/channel_operation_timeout_test_queue.erl b/test/channel_operation_timeout_test_queue.erl
new file mode 100644
index 0000000000..55cd5f42fa
--- /dev/null
+++ b/test/channel_operation_timeout_test_queue.erl
@@ -0,0 +1,2443 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(channel_operation_timeout_test_queue).
+
+-export([init/3, terminate/2, delete_and_terminate/2, delete_crashed/1,
+ purge/1, purge_acks/1,
+ publish/6, publish_delivered/5,
+ batch_publish/4, batch_publish_delivered/4,
+ discard/4, drain_confirmed/1,
+ dropwhile/2, fetchwhile/4, fetch/2, drop/2, ack/2, requeue/2,
+ ackfold/4, fold/3, len/1, is_empty/1, depth/1,
+ set_ram_duration_target/2, ram_duration/1, needs_timeout/1, timeout/1,
+ handle_pre_hibernate/1, resume/1, msg_rates/1,
+ info/2, invoke/3, is_duplicate/2, set_queue_mode/2,
+ zip_msgs_and_acks/4, multiple_routing_keys/0]).
+
+-export([start/1, stop/0]).
+
+%% exported for testing only
+-export([start_msg_store/2, stop_msg_store/0, init/6]).
+
+%%----------------------------------------------------------------------------
+%% This test backing queue follows the variable queue implementation, with
+%% the exception that it will introduce infinite delays on some operations if
+%% the test message has been published, and is awaiting acknowledgement in the
+%% queue index. Test message is "timeout_test_msg!".
+%%
+%%----------------------------------------------------------------------------
+
+-behaviour(rabbit_backing_queue).
+
+-record(vqstate,
+ { q1,
+ q2,
+ delta,
+ q3,
+ q4,
+ next_seq_id,
+ ram_pending_ack, %% msgs using store, still in RAM
+ disk_pending_ack, %% msgs in store, paged out
+ qi_pending_ack, %% msgs using qi, *can't* be paged out
+ index_state,
+ msg_store_clients,
+ durable,
+ transient_threshold,
+ qi_embed_msgs_below,
+
+ len, %% w/o unacked
+ bytes, %% w/o unacked
+ unacked_bytes,
+ persistent_count, %% w unacked
+ persistent_bytes, %% w unacked
+
+ target_ram_count,
+ ram_msg_count, %% w/o unacked
+ ram_msg_count_prev,
+ ram_ack_count_prev,
+ ram_bytes, %% w unacked
+ out_counter,
+ in_counter,
+ rates,
+ msgs_on_disk,
+ msg_indices_on_disk,
+ unconfirmed,
+ confirmed,
+ ack_out_counter,
+ ack_in_counter,
+ %% Unlike the other counters these two do not feed into
+ %% #rates{} and get reset
+ disk_read_count,
+ disk_write_count,
+
+ io_batch_size,
+
+ %% default queue or lazy queue
+ mode
+ }).
+
+-record(rates, { in, out, ack_in, ack_out, timestamp }).
+
+-record(msg_status,
+ { seq_id,
+ msg_id,
+ msg,
+ is_persistent,
+ is_delivered,
+ msg_in_store,
+ index_on_disk,
+ persist_to,
+ msg_props
+ }).
+
+-record(delta,
+ { start_seq_id, %% start_seq_id is inclusive
+ count,
+ end_seq_id %% end_seq_id is exclusive
+ }).
+
+-define(HEADER_GUESS_SIZE, 100). %% see determine_persist_to/2
+-define(PERSISTENT_MSG_STORE, msg_store_persistent).
+-define(TRANSIENT_MSG_STORE, msg_store_transient).
+-define(QUEUE, lqueue).
+-define(TIMEOUT_TEST_MSG, <<"timeout_test_msg!">>).
+
+-include("rabbit.hrl").
+-include("rabbit_framing.hrl").
+
+%%----------------------------------------------------------------------------
+
+-rabbit_upgrade({multiple_routing_keys, local, []}).
+
+-ifdef(use_specs).
+
+-type(seq_id() :: non_neg_integer()).
+
+-type(rates() :: #rates { in :: float(),
+ out :: float(),
+ ack_in :: float(),
+ ack_out :: float(),
+ timestamp :: rabbit_types:timestamp()}).
+
+-type(delta() :: #delta { start_seq_id :: non_neg_integer(),
+ count :: non_neg_integer(),
+ end_seq_id :: non_neg_integer() }).
+
+%% The compiler (rightfully) complains that ack() and state() are
+%% unused. For this reason we duplicate a -spec from
+%% rabbit_backing_queue with the only intent being to remove
+%% warnings. The problem here is that we can't parameterise the BQ
+%% behaviour by these two types as we would like to. We still leave
+%% these here for documentation purposes.
+-type(ack() :: seq_id()).
+-type(state() :: #vqstate {
+ q1 :: ?QUEUE:?QUEUE(),
+ q2 :: ?QUEUE:?QUEUE(),
+ delta :: delta(),
+ q3 :: ?QUEUE:?QUEUE(),
+ q4 :: ?QUEUE:?QUEUE(),
+ next_seq_id :: seq_id(),
+ ram_pending_ack :: gb_trees:tree(),
+ disk_pending_ack :: gb_trees:tree(),
+ qi_pending_ack :: gb_trees:tree(),
+ index_state :: any(),
+ msg_store_clients :: 'undefined' | {{any(), binary()},
+ {any(), binary()}},
+ durable :: boolean(),
+ transient_threshold :: non_neg_integer(),
+ qi_embed_msgs_below :: non_neg_integer(),
+
+ len :: non_neg_integer(),
+ bytes :: non_neg_integer(),
+ unacked_bytes :: non_neg_integer(),
+
+ persistent_count :: non_neg_integer(),
+ persistent_bytes :: non_neg_integer(),
+
+ target_ram_count :: non_neg_integer() | 'infinity',
+ ram_msg_count :: non_neg_integer(),
+ ram_msg_count_prev :: non_neg_integer(),
+ ram_ack_count_prev :: non_neg_integer(),
+ ram_bytes :: non_neg_integer(),
+ out_counter :: non_neg_integer(),
+ in_counter :: non_neg_integer(),
+ rates :: rates(),
+ msgs_on_disk :: gb_sets:set(),
+ msg_indices_on_disk :: gb_sets:set(),
+ unconfirmed :: gb_sets:set(),
+ confirmed :: gb_sets:set(),
+ ack_out_counter :: non_neg_integer(),
+ ack_in_counter :: non_neg_integer(),
+ disk_read_count :: non_neg_integer(),
+ disk_write_count :: non_neg_integer(),
+
+ io_batch_size :: pos_integer(),
+ mode :: 'default' | 'lazy' }).
+%% Duplicated from rabbit_backing_queue
+-spec(ack/2 :: ([ack()], state()) -> {[rabbit_guid:guid()], state()}).
+
+-spec(multiple_routing_keys/0 :: () -> 'ok').
+
+-endif.
+
+-define(BLANK_DELTA, #delta { start_seq_id = undefined,
+ count = 0,
+ end_seq_id = undefined }).
+-define(BLANK_DELTA_PATTERN(Z), #delta { start_seq_id = Z,
+ count = 0,
+ end_seq_id = Z }).
+
+-define(MICROS_PER_SECOND, 1000000.0).
+
+%% We're sampling every 5s for RAM duration; a half life that is of
+%% the same order of magnitude is probably about right.
+-define(RATE_AVG_HALF_LIFE, 5.0).
+
+%% We will recalculate the #rates{} every time we get asked for our
+%% RAM duration, or every N messages published, whichever is
+%% sooner. We do this since the priority calculations in
+%% rabbit_amqqueue_process need fairly fresh rates.
+-define(MSGS_PER_RATE_CALC, 100).
+
+%%----------------------------------------------------------------------------
+%% Public API
+%%----------------------------------------------------------------------------
+
+start(DurableQueues) ->
+ {AllTerms, StartFunState} = rabbit_queue_index:start(DurableQueues),
+ start_msg_store(
+ [Ref || Terms <- AllTerms,
+ Terms /= non_clean_shutdown,
+ begin
+ Ref = proplists:get_value(persistent_ref, Terms),
+ Ref =/= undefined
+ end],
+ StartFunState),
+ {ok, AllTerms}.
+
+stop() ->
+ ok = stop_msg_store(),
+ ok = rabbit_queue_index:stop().
+
+start_msg_store(Refs, StartFunState) ->
+ ok = rabbit_sup:start_child(?TRANSIENT_MSG_STORE, rabbit_msg_store,
+ [?TRANSIENT_MSG_STORE, rabbit_mnesia:dir(),
+ undefined, {fun (ok) -> finished end, ok}]),
+ ok = rabbit_sup:start_child(?PERSISTENT_MSG_STORE, rabbit_msg_store,
+ [?PERSISTENT_MSG_STORE, rabbit_mnesia:dir(),
+ Refs, StartFunState]).
+
+stop_msg_store() ->
+ ok = rabbit_sup:stop_child(?PERSISTENT_MSG_STORE),
+ ok = rabbit_sup:stop_child(?TRANSIENT_MSG_STORE).
+
+init(Queue, Recover, Callback) ->
+ init(
+ Queue, Recover, Callback,
+ fun (MsgIds, ActionTaken) ->
+ msgs_written_to_disk(Callback, MsgIds, ActionTaken)
+ end,
+ fun (MsgIds) -> msg_indices_written_to_disk(Callback, MsgIds) end,
+ fun (MsgIds) -> msgs_and_indices_written_to_disk(Callback, MsgIds) end).
+
+init(#amqqueue { name = QueueName, durable = IsDurable }, new,
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ IndexState = rabbit_queue_index:init(QueueName,
+ MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ init(IsDurable, IndexState, 0, 0, [],
+ case IsDurable of
+ true -> msg_store_client_init(?PERSISTENT_MSG_STORE,
+ MsgOnDiskFun, AsyncCallback);
+ false -> undefined
+ end,
+ msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, AsyncCallback));
+
+%% We can be recovering a transient queue if it crashed
+init(#amqqueue { name = QueueName, durable = IsDurable }, Terms,
+ AsyncCallback, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) ->
+ {PRef, RecoveryTerms} = process_recovery_terms(Terms),
+ {PersistentClient, ContainsCheckFun} =
+ case IsDurable of
+ true -> C = msg_store_client_init(?PERSISTENT_MSG_STORE, PRef,
+ MsgOnDiskFun, AsyncCallback),
+ {C, fun (MsgId) when is_binary(MsgId) ->
+ rabbit_msg_store:contains(MsgId, C);
+ (#basic_message{is_persistent = Persistent}) ->
+ Persistent
+ end};
+ false -> {undefined, fun(_MsgId) -> false end}
+ end,
+ TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE,
+ undefined, AsyncCallback),
+ {DeltaCount, DeltaBytes, IndexState} =
+ rabbit_queue_index:recover(
+ QueueName, RecoveryTerms,
+ rabbit_msg_store:successfully_recovered_state(?PERSISTENT_MSG_STORE),
+ ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun),
+ init(IsDurable, IndexState, DeltaCount, DeltaBytes, RecoveryTerms,
+ PersistentClient, TransientClient).
+
+process_recovery_terms(Terms=non_clean_shutdown) ->
+ {rabbit_guid:gen(), Terms};
+process_recovery_terms(Terms) ->
+ case proplists:get_value(persistent_ref, Terms) of
+ undefined -> {rabbit_guid:gen(), []};
+ PRef -> {PRef, Terms}
+ end.
+
+terminate(_Reason, State) ->
+ State1 = #vqstate { persistent_count = PCount,
+ persistent_bytes = PBytes,
+ index_state = IndexState,
+ msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack(true, State),
+ PRef = case MSCStateP of
+ undefined -> undefined;
+ _ -> ok = rabbit_msg_store:client_terminate(MSCStateP),
+ rabbit_msg_store:client_ref(MSCStateP)
+ end,
+ ok = rabbit_msg_store:client_delete_and_terminate(MSCStateT),
+ Terms = [{persistent_ref, PRef},
+ {persistent_count, PCount},
+ {persistent_bytes, PBytes}],
+ a(State1 #vqstate { index_state = rabbit_queue_index:terminate(
+ Terms, IndexState),
+ msg_store_clients = undefined }).
+
+%% the only difference between purge and delete is that delete also
+%% needs to delete everything that's been delivered and not ack'd.
+delete_and_terminate(_Reason, State) ->
+ %% Normally when we purge messages we interact with the qi by
+ %% issues delivers and acks for every purged message. In this case
+ %% we don't need to do that, so we just delete the qi.
+ State1 = purge_and_index_reset(State),
+ State2 = #vqstate { msg_store_clients = {MSCStateP, MSCStateT} } =
+ purge_pending_ack_delete_and_terminate(State1),
+ case MSCStateP of
+ undefined -> ok;
+ _ -> rabbit_msg_store:client_delete_and_terminate(MSCStateP)
+ end,
+ rabbit_msg_store:client_delete_and_terminate(MSCStateT),
+ a(State2 #vqstate { msg_store_clients = undefined }).
+
+delete_crashed(#amqqueue{name = QName}) ->
+ ok = rabbit_queue_index:erase(QName).
+
+purge(State = #vqstate { len = Len, qi_pending_ack= QPA }) ->
+ maybe_delay(QPA),
+ case is_pending_ack_empty(State) of
+ true ->
+ {Len, purge_and_index_reset(State)};
+ false ->
+ {Len, purge_when_pending_acks(State)}
+ end.
+
+purge_acks(State) -> a(purge_pending_ack(false, State)).
+
+publish(Msg, MsgProps, IsDelivered, ChPid, Flow, State) ->
+ State1 =
+ publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ a(reduce_memory_use(maybe_update_rates(State1))).
+
+batch_publish(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, State1} =
+ lists:foldl(fun batch_publish1/2, {ChPid, Flow, State}, Publishes),
+ State2 = ui(State1),
+ a(reduce_memory_use(maybe_update_rates(State2))).
+
+publish_delivered(Msg, MsgProps, ChPid, Flow, State) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_write_to_disk/4,
+ State),
+ {SeqId, a(reduce_memory_use(maybe_update_rates(State1)))}.
+
+batch_publish_delivered(Publishes, ChPid, Flow, State) ->
+ {ChPid, Flow, SeqIds, State1} =
+ lists:foldl(fun batch_publish_delivered1/2,
+ {ChPid, Flow, [], State}, Publishes),
+ State2 = ui(State1),
+ {lists:reverse(SeqIds), a(reduce_memory_use(maybe_update_rates(State2)))}.
+
+discard(_MsgId, _ChPid, _Flow, State) -> State.
+
+drain_confirmed(State = #vqstate { confirmed = C }) ->
+ case gb_sets:is_empty(C) of
+ true -> {[], State}; %% common case
+ false -> {gb_sets:to_list(C), State #vqstate {
+ confirmed = gb_sets:new() }}
+ end.
+
+dropwhile(Pred, State) ->
+ {MsgProps, State1} =
+ remove_by_predicate(Pred, State),
+ {MsgProps, a(State1)}.
+
+fetchwhile(Pred, Fun, Acc, State) ->
+ {MsgProps, Acc1, State1} =
+ fetch_by_predicate(Pred, Fun, Acc, State),
+ {MsgProps, Acc1, a(State1)}.
+
+fetch(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ %% it is possible that the message wasn't read from disk
+ %% at this point, so read it in.
+ {Msg, State2} = read_msg(MsgStatus, State1),
+ {AckTag, State3} = remove(AckRequired, MsgStatus, State2),
+ {{Msg, MsgStatus#msg_status.is_delivered, AckTag}, a(State3)}
+ end.
+
+drop(AckRequired, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {empty, a(State1)};
+ {{value, MsgStatus}, State1} ->
+ {AckTag, State2} = remove(AckRequired, MsgStatus, State1),
+ {{MsgStatus#msg_status.msg_id, AckTag}, a(State2)}
+ end.
+
+ack([], State) ->
+ {[], State};
+%% optimisation: this head is essentially a partial evaluation of the
+%% general case below, for the single-ack case.
+ack([SeqId], State) ->
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} =
+ remove_pending_ack(true, SeqId, State),
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + 1 })};
+ack(AckTags, State) ->
+ {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} =
+ lists:foldl(
+ fun (SeqId, {Acc, State2}) ->
+ {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end, {accumulate_ack_init(), State}, AckTags),
+ IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ {lists:reverse(AllMsgIds),
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + length(AckTags) })}.
+
+requeue(AckTags, #vqstate { mode = default,
+ delta = Delta,
+ q3 = Q3,
+ q4 = Q4,
+ in_counter = InCounter,
+ len = Len,
+ qi_pending_ack = QPA } = State) ->
+ maybe_delay(QPA),
+ {SeqIds, Q4a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q4, [],
+ beta_limit(Q3),
+ fun publish_alpha/2, State),
+ {SeqIds1, Q3a, MsgIds1, State2} = queue_merge(SeqIds, Q3, MsgIds,
+ delta_limit(Delta),
+ fun publish_beta/2, State1),
+ {Delta1, MsgIds2, State3} = delta_merge(SeqIds1, Delta, MsgIds1,
+ State2),
+ MsgCount = length(MsgIds2),
+ {MsgIds2, a(reduce_memory_use(
+ maybe_update_rates(
+ State3 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ q4 = Q4a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))};
+requeue(AckTags, #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3,
+ in_counter = InCounter,
+ len = Len,
+ qi_pending_ack = QPA } = State) ->
+ maybe_delay(QPA),
+ {SeqIds, Q3a, MsgIds, State1} = queue_merge(lists:sort(AckTags), Q3, [],
+ delta_limit(Delta),
+ fun publish_beta/2, State),
+ {Delta1, MsgIds1, State2} = delta_merge(SeqIds, Delta, MsgIds,
+ State1),
+ MsgCount = length(MsgIds1),
+ {MsgIds1, a(reduce_memory_use(
+ maybe_update_rates(
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3a,
+ in_counter = InCounter + MsgCount,
+ len = Len + MsgCount })))}.
+
+ackfold(MsgFun, Acc, State, AckTags) ->
+ {AccN, StateN} =
+ lists:foldl(fun(SeqId, {Acc0, State0}) ->
+ MsgStatus = lookup_pending_ack(SeqId, State0),
+ {Msg, State1} = read_msg(MsgStatus, State0),
+ {MsgFun(Msg, SeqId, Acc0), State1}
+ end, {Acc, State}, AckTags),
+ {AccN, a(StateN)}.
+
+fold(Fun, Acc, State = #vqstate{index_state = IndexState}) ->
+ {Its, IndexState1} = lists:foldl(fun inext/2, {[], IndexState},
+ [msg_iterator(State),
+ disk_ack_iterator(State),
+ ram_ack_iterator(State),
+ qi_ack_iterator(State)]),
+ ifold(Fun, Acc, Its, State#vqstate{index_state = IndexState1}).
+
+len(#vqstate { len = Len, qi_pending_ack = QPA }) ->
+ maybe_delay(QPA),
+ Len.
+
+is_empty(State) -> 0 == len(State).
+
+depth(State) ->
+ len(State) + count_pending_acks(State).
+
+set_ram_duration_target(
+ DurationTarget, State = #vqstate {
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ target_ram_count = TargetRamCount }) ->
+ Rate =
+ AvgEgressRate + AvgIngressRate + AvgAckEgressRate + AvgAckIngressRate,
+ TargetRamCount1 =
+ case DurationTarget of
+ infinity -> infinity;
+ _ -> trunc(DurationTarget * Rate) %% msgs = sec * msgs/sec
+ end,
+ State1 = State #vqstate { target_ram_count = TargetRamCount1 },
+ a(case TargetRamCount1 == infinity orelse
+ (TargetRamCount =/= infinity andalso
+ TargetRamCount1 >= TargetRamCount) of
+ true -> State1;
+ false -> reduce_memory_use(State1)
+ end).
+
+maybe_update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount })
+ when InCount + OutCount > ?MSGS_PER_RATE_CALC ->
+ update_rates(State);
+maybe_update_rates(State) ->
+ State.
+
+update_rates(State = #vqstate{ in_counter = InCount,
+ out_counter = OutCount,
+ ack_in_counter = AckInCount,
+ ack_out_counter = AckOutCount,
+ rates = #rates{ in = InRate,
+ out = OutRate,
+ ack_in = AckInRate,
+ ack_out = AckOutRate,
+ timestamp = TS }}) ->
+ Now = time_compat:monotonic_time(),
+
+ Rates = #rates { in = update_rate(Now, TS, InCount, InRate),
+ out = update_rate(Now, TS, OutCount, OutRate),
+ ack_in = update_rate(Now, TS, AckInCount, AckInRate),
+ ack_out = update_rate(Now, TS, AckOutCount, AckOutRate),
+ timestamp = Now },
+
+ State#vqstate{ in_counter = 0,
+ out_counter = 0,
+ ack_in_counter = 0,
+ ack_out_counter = 0,
+ rates = Rates }.
+
+update_rate(Now, TS, Count, Rate) ->
+ Time = time_compat:convert_time_unit(Now - TS, native, micro_seconds) /
+ ?MICROS_PER_SECOND,
+ if
+ Time == 0 -> Rate;
+ true -> rabbit_misc:moving_average(Time, ?RATE_AVG_HALF_LIFE,
+ Count / Time, Rate)
+ end.
+
+ram_duration(State) ->
+ State1 = #vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate },
+ ram_msg_count = RamMsgCount,
+ ram_msg_count_prev = RamMsgCountPrev,
+ ram_pending_ack = RPA,
+ qi_pending_ack = QPA,
+ ram_ack_count_prev = RamAckCountPrev } =
+ update_rates(State),
+
+ RamAckCount = gb_trees:size(RPA) + gb_trees:size(QPA),
+
+ Duration = %% msgs+acks / (msgs+acks/sec) == sec
+ case lists:all(fun (X) -> X < 0.01 end,
+ [AvgEgressRate, AvgIngressRate,
+ AvgAckEgressRate, AvgAckIngressRate]) of
+ true -> infinity;
+ false -> (RamMsgCountPrev + RamMsgCount +
+ RamAckCount + RamAckCountPrev) /
+ (4 * (AvgEgressRate + AvgIngressRate +
+ AvgAckEgressRate + AvgAckIngressRate))
+ end,
+
+ {Duration, State1}.
+
+needs_timeout(#vqstate { index_state = IndexState }) ->
+ case rabbit_queue_index:needs_sync(IndexState) of
+ confirms -> timed;
+ other -> idle;
+ false -> false
+ end.
+
+timeout(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:sync(IndexState) }.
+
+handle_pre_hibernate(State = #vqstate { index_state = IndexState }) ->
+ State #vqstate { index_state = rabbit_queue_index:flush(IndexState) }.
+
+resume(State) -> a(reduce_memory_use(State)).
+
+msg_rates(#vqstate { rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate } }) ->
+ {AvgIngressRate, AvgEgressRate}.
+
+info(messages_ready_ram, #vqstate{ram_msg_count = RamMsgCount}) ->
+ RamMsgCount;
+info(messages_unacknowledged_ram, #vqstate{ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ gb_trees:size(RPA) + gb_trees:size(QPA);
+info(messages_ram, State) ->
+ info(messages_ready_ram, State) + info(messages_unacknowledged_ram, State);
+info(messages_persistent, #vqstate{persistent_count = PersistentCount}) ->
+ PersistentCount;
+info(message_bytes, #vqstate{bytes = Bytes,
+ unacked_bytes = UBytes}) ->
+ Bytes + UBytes;
+info(message_bytes_ready, #vqstate{bytes = Bytes}) ->
+ Bytes;
+info(message_bytes_unacknowledged, #vqstate{unacked_bytes = UBytes}) ->
+ UBytes;
+info(message_bytes_ram, #vqstate{ram_bytes = RamBytes}) ->
+ RamBytes;
+info(message_bytes_persistent, #vqstate{persistent_bytes = PersistentBytes}) ->
+ PersistentBytes;
+info(head_message_timestamp, #vqstate{
+ q3 = Q3,
+ q4 = Q4,
+ ram_pending_ack = RPA,
+ qi_pending_ack = QPA}) ->
+ head_message_timestamp(Q3, Q4, RPA, QPA);
+info(disk_reads, #vqstate{disk_read_count = Count}) ->
+ Count;
+info(disk_writes, #vqstate{disk_write_count = Count}) ->
+ Count;
+info(backing_queue_status, #vqstate {
+ q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = Mode,
+ len = Len,
+ target_ram_count = TargetRamCount,
+ next_seq_id = NextSeqId,
+ rates = #rates { in = AvgIngressRate,
+ out = AvgEgressRate,
+ ack_in = AvgAckIngressRate,
+ ack_out = AvgAckEgressRate }}) ->
+
+ [ {mode , Mode},
+ {q1 , ?QUEUE:len(Q1)},
+ {q2 , ?QUEUE:len(Q2)},
+ {delta , Delta},
+ {q3 , ?QUEUE:len(Q3)},
+ {q4 , ?QUEUE:len(Q4)},
+ {len , Len},
+ {target_ram_count , TargetRamCount},
+ {next_seq_id , NextSeqId},
+ {avg_ingress_rate , AvgIngressRate},
+ {avg_egress_rate , AvgEgressRate},
+ {avg_ack_ingress_rate, AvgAckIngressRate},
+ {avg_ack_egress_rate , AvgAckEgressRate} ];
+info(Item, _) ->
+ throw({bad_argument, Item}).
+
+invoke(?MODULE, Fun, State) -> Fun(?MODULE, State);
+invoke( _, _, State) -> State.
+
+is_duplicate(_Msg, State) -> {false, State}.
+
+set_queue_mode(Mode, State = #vqstate { mode = Mode }) ->
+ State;
+set_queue_mode(lazy, State = #vqstate {
+ target_ram_count = TargetRamCount }) ->
+ %% To become a lazy queue we need to page everything to disk first.
+ State1 = convert_to_lazy(State),
+ %% restore the original target_ram_count
+ a(State1 #vqstate { mode = lazy, target_ram_count = TargetRamCount });
+set_queue_mode(default, State) ->
+ %% becoming a default queue means loading messages from disk like
+ %% when a queue is recovered.
+ a(maybe_deltas_to_betas(State #vqstate { mode = default }));
+set_queue_mode(_, State) ->
+ State.
+
+zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) ->
+ lists:foldl(fun ({{#basic_message{ id = Id }, _Props}, AckTag}, Acc) ->
+ [{Id, AckTag} | Acc]
+ end, Accumulator, lists:zip(Msgs, AckTags)).
+
+convert_to_lazy(State) ->
+ State1 = #vqstate { delta = Delta, q3 = Q3, len = Len } =
+ set_ram_duration_target(0, State),
+ case Delta#delta.count + ?QUEUE:len(Q3) == Len of
+ true ->
+ State1;
+ false ->
+ %% When pushing messages to disk, we might have been
+ %% blocked by the msg_store, so we need to see if we have
+ %% to wait for more credit, and then keep paging messages.
+ %%
+ %% The amqqueue_process could have taken care of this, but
+ %% between the time it receives the bump_credit msg and
+ %% calls BQ:resume to keep paging messages to disk, some
+ %% other request may arrive to the BQ which at this moment
+ %% is not in a proper state for a lazy BQ (unless all
+ %% messages have been paged to disk already).
+ wait_for_msg_store_credit(),
+ convert_to_lazy(State1)
+ end.
+
+wait_for_msg_store_credit() ->
+ case credit_flow:blocked() of
+ true -> receive
+ {bump_credit, Msg} ->
+ credit_flow:handle_bump_msg(Msg)
+ end;
+ false -> ok
+ end.
+
+%% Get the Timestamp property of the first msg, if present. This is
+%% the one with the oldest timestamp among the heads of the pending
+%% acks and unread queues. We can't check disk_pending_acks as these
+%% are paged out - we assume some will soon be paged in rather than
+%% forcing it to happen. Pending ack msgs are included as they are
+%% regarded as unprocessed until acked, this also prevents the result
+%% apparently oscillating during repeated rejects. Q3 is only checked
+%% when Q4 is empty as any Q4 msg will be earlier.
+head_message_timestamp(Q3, Q4, RPA, QPA) ->
+ HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
+ HeadMsgStatus <-
+ [ get_qs_head([Q4, Q3]),
+ get_pa_head(RPA),
+ get_pa_head(QPA) ],
+ HeadMsgStatus /= undefined,
+ HeadMsgStatus#msg_status.msg /= undefined ],
+
+ Timestamps =
+ [Timestamp || HeadMsg <- HeadMsgs,
+ Timestamp <- [rabbit_basic:extract_timestamp(
+ HeadMsg#basic_message.content)],
+ Timestamp /= undefined
+ ],
+
+ case Timestamps == [] of
+ true -> '';
+ false -> lists:min(Timestamps)
+ end.
+
+get_qs_head(Qs) ->
+ catch lists:foldl(
+ fun (Q, Acc) ->
+ case get_q_head(Q) of
+ undefined -> Acc;
+ Val -> throw(Val)
+ end
+ end, undefined, Qs).
+
+get_q_head(Q) ->
+ get_collection_head(Q, fun ?QUEUE:is_empty/1, fun ?QUEUE:peek/1).
+
+get_pa_head(PA) ->
+ get_collection_head(PA, fun gb_trees:is_empty/1, fun gb_trees:smallest/1).
+
+get_collection_head(Col, IsEmpty, GetVal) ->
+ case IsEmpty(Col) of
+ false ->
+ {_, MsgStatus} = GetVal(Col),
+ MsgStatus;
+ true -> undefined
+ end.
+
+%%----------------------------------------------------------------------------
+%% Minor helpers
+%%----------------------------------------------------------------------------
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = default,
+ len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
+ persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
+ LZ = Len == 0,
+
+ %% if q1 has messages then q3 cannot be empty. See publish/6.
+ true = E1 or not E3,
+ %% if q2 has messages then we have messages in delta (paged to
+ %% disk). See push_alphas_to_betas/2.
+ true = E2 or not ED,
+ %% if delta has messages then q3 cannot be empty. This is enforced
+ %% by paging, where min([?SEGMENT_ENTRY_COUNT, len(q3)]) messages
+ %% are always kept on RAM.
+ true = ED or not E3,
+ %% if the queue length is 0, then q3 and q4 must be empty.
+ true = LZ == (E3 and E4),
+
+ true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
+ true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
+ true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
+
+ State;
+a(State = #vqstate { q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
+ mode = lazy,
+ len = Len,
+ bytes = Bytes,
+ unacked_bytes = UnackedBytes,
+ persistent_count = PersistentCount,
+ persistent_bytes = PersistentBytes,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes}) ->
+ E1 = ?QUEUE:is_empty(Q1),
+ E2 = ?QUEUE:is_empty(Q2),
+ ED = Delta#delta.count == 0,
+ E3 = ?QUEUE:is_empty(Q3),
+ E4 = ?QUEUE:is_empty(Q4),
+ LZ = Len == 0,
+ L3 = ?QUEUE:len(Q3),
+
+ %% q1 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E1,
+
+ %% q2 only gets messages from q1 when push_alphas_to_betas is
+ %% called for a non empty delta, which won't be the case for a
+ %% lazy queue. This means q2 must always be empty.
+ true = E2,
+
+ %% q4 must always be empty, since q1 only gets messages during
+ %% publish, but for lazy queues messages go straight to delta.
+ true = E4,
+
+ %% if the queue is empty, then delta is empty and q3 is empty.
+ true = LZ == (ED and E3),
+
+ %% There should be no messages in q1, q2, and q4
+ true = Delta#delta.count + L3 == Len,
+
+ true = Len >= 0,
+ true = Bytes >= 0,
+ true = UnackedBytes >= 0,
+ true = PersistentCount >= 0,
+ true = PersistentBytes >= 0,
+ true = RamMsgCount >= 0,
+ true = RamMsgCount =< Len,
+ true = RamBytes >= 0,
+ true = RamBytes =< Bytes + UnackedBytes,
+
+ State.
+
+d(Delta = #delta { start_seq_id = Start, count = Count, end_seq_id = End })
+ when Start + Count =< End ->
+ Delta.
+
+m(MsgStatus = #msg_status { is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk }) ->
+ true = (not IsPersistent) or IndexOnDisk,
+ true = msg_in_ram(MsgStatus) or MsgInStore,
+ MsgStatus.
+
+one_if(true ) -> 1;
+one_if(false) -> 0.
+
+cons_if(true, E, L) -> [E | L];
+cons_if(false, _E, L) -> L.
+
+gb_sets_maybe_insert(false, _Val, Set) -> Set;
+gb_sets_maybe_insert(true, Val, Set) -> gb_sets:add(Val, Set).
+
+msg_status(IsPersistent, IsDelivered, SeqId,
+ Msg = #basic_message {id = MsgId}, MsgProps, IndexMaxSize) ->
+ #msg_status{seq_id = SeqId,
+ msg_id = MsgId,
+ msg = Msg,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_in_store = false,
+ index_on_disk = false,
+ persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize),
+ msg_props = MsgProps}.
+
+beta_msg_status({Msg = #basic_message{id = MsgId},
+ SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = Msg,
+ persist_to = queue_index,
+ msg_in_store = false};
+
+beta_msg_status({MsgId, SeqId, MsgProps, IsPersistent, IsDelivered}) ->
+ MS0 = beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered),
+ MS0#msg_status{msg_id = MsgId,
+ msg = undefined,
+ persist_to = msg_store,
+ msg_in_store = true}.
+
+beta_msg_status0(SeqId, MsgProps, IsPersistent, IsDelivered) ->
+ #msg_status{seq_id = SeqId,
+ msg = undefined,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ index_on_disk = true,
+ msg_props = MsgProps}.
+
+trim_msg_status(MsgStatus) ->
+ case persist_to(MsgStatus) of
+ msg_store -> MsgStatus#msg_status{msg = undefined};
+ queue_index -> MsgStatus
+ end.
+
+with_msg_store_state({MSCStateP, MSCStateT}, true, Fun) ->
+ {Result, MSCStateP1} = Fun(MSCStateP),
+ {Result, {MSCStateP1, MSCStateT}};
+with_msg_store_state({MSCStateP, MSCStateT}, false, Fun) ->
+ {Result, MSCStateT1} = Fun(MSCStateT),
+ {Result, {MSCStateP, MSCStateT1}}.
+
+with_immutable_msg_store_state(MSCState, IsPersistent, Fun) ->
+ {Res, MSCState} = with_msg_store_state(MSCState, IsPersistent,
+ fun (MSCState1) ->
+ {Fun(MSCState1), MSCState1}
+ end),
+ Res.
+
+msg_store_client_init(MsgStore, MsgOnDiskFun, Callback) ->
+ msg_store_client_init(MsgStore, rabbit_guid:gen(), MsgOnDiskFun,
+ Callback).
+
+msg_store_client_init(MsgStore, Ref, MsgOnDiskFun, Callback) ->
+ CloseFDsFun = msg_store_close_fds_fun(MsgStore =:= ?PERSISTENT_MSG_STORE),
+ rabbit_msg_store:client_init(MsgStore, Ref, MsgOnDiskFun,
+ fun () -> Callback(?MODULE, CloseFDsFun) end).
+
+msg_store_write(MSCState, IsPersistent, MsgId, Msg) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) ->
+ rabbit_msg_store:write_flow(MsgId, Msg, MSCState1)
+ end).
+
+msg_store_read(MSCState, IsPersistent, MsgId) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) ->
+ rabbit_msg_store:read(MsgId, MSCState1)
+ end).
+
+msg_store_remove(MSCState, IsPersistent, MsgIds) ->
+ with_immutable_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MCSState1) ->
+ rabbit_msg_store:remove(MsgIds, MCSState1)
+ end).
+
+msg_store_close_fds(MSCState, IsPersistent) ->
+ with_msg_store_state(
+ MSCState, IsPersistent,
+ fun (MSCState1) -> rabbit_msg_store:close_all_indicated(MSCState1) end).
+
+msg_store_close_fds_fun(IsPersistent) ->
+ fun (?MODULE, State = #vqstate { msg_store_clients = MSCState }) ->
+ {ok, MSCState1} = msg_store_close_fds(MSCState, IsPersistent),
+ State #vqstate { msg_store_clients = MSCState1 }
+ end.
+
+maybe_write_delivered(false, _SeqId, IndexState) ->
+ IndexState;
+maybe_write_delivered(true, SeqId, IndexState) ->
+ rabbit_queue_index:deliver([SeqId], IndexState).
+
+betas_from_index_entries(List, TransientThreshold, DelsAndAcksFun, State) ->
+ {Filtered, Delivers, Acks, RamReadyCount, RamBytes} =
+ lists:foldr(
+ fun ({_MsgOrId, SeqId, _MsgProps, IsPersistent, IsDelivered} = M,
+ {Filtered1, Delivers1, Acks1, RRC, RB} = Acc) ->
+ case SeqId < TransientThreshold andalso not IsPersistent of
+ true -> {Filtered1,
+ cons_if(not IsDelivered, SeqId, Delivers1),
+ [SeqId | Acks1], RRC, RB};
+ false -> MsgStatus = m(beta_msg_status(M)),
+ HaveMsg = msg_in_ram(MsgStatus),
+ Size = msg_size(MsgStatus),
+ case is_msg_in_pending_acks(SeqId, State) of
+ false -> {?QUEUE:in_r(MsgStatus, Filtered1),
+ Delivers1, Acks1,
+ RRC + one_if(HaveMsg),
+ RB + one_if(HaveMsg) * Size};
+ true -> Acc %% [0]
+ end
+ end
+ end, {?QUEUE:new(), [], [], 0, 0}, List),
+ {Filtered, RamReadyCount, RamBytes, DelsAndAcksFun(Delivers, Acks, State)}.
+%% [0] We don't increase RamBytes here, even though it pertains to
+%% unacked messages too, since if HaveMsg then the message must have
+%% been stored in the QI, thus the message must have been in
+%% qi_pending_ack, thus it must already have been in RAM.
+
+is_msg_in_pending_acks(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ (gb_trees:is_defined(SeqId, RPA) orelse
+ gb_trees:is_defined(SeqId, DPA) orelse
+ gb_trees:is_defined(SeqId, QPA)).
+
+expand_delta(SeqId, ?BLANK_DELTA_PATTERN(X)) ->
+ d(#delta { start_seq_id = SeqId, count = 1, end_seq_id = SeqId + 1 });
+expand_delta(SeqId, #delta { start_seq_id = StartSeqId,
+ count = Count } = Delta)
+ when SeqId < StartSeqId ->
+ d(Delta #delta { start_seq_id = SeqId, count = Count + 1 });
+expand_delta(SeqId, #delta { count = Count,
+ end_seq_id = EndSeqId } = Delta)
+ when SeqId >= EndSeqId ->
+ d(Delta #delta { count = Count + 1, end_seq_id = SeqId + 1 });
+expand_delta(_SeqId, #delta { count = Count } = Delta) ->
+ d(Delta #delta { count = Count + 1 }).
+
+%%----------------------------------------------------------------------------
+%% Internal major helpers for Public API
+%%----------------------------------------------------------------------------
+
+init(IsDurable, IndexState, DeltaCount, DeltaBytes, Terms,
+ PersistentClient, TransientClient) ->
+ {LowSeqId, NextSeqId, IndexState1} = rabbit_queue_index:bounds(IndexState),
+
+ {DeltaCount1, DeltaBytes1} =
+ case Terms of
+ non_clean_shutdown -> {DeltaCount, DeltaBytes};
+ _ -> {proplists:get_value(persistent_count,
+ Terms, DeltaCount),
+ proplists:get_value(persistent_bytes,
+ Terms, DeltaBytes)}
+ end,
+ Delta = case DeltaCount1 == 0 andalso DeltaCount /= undefined of
+ true -> ?BLANK_DELTA;
+ false -> d(#delta { start_seq_id = LowSeqId,
+ count = DeltaCount1,
+ end_seq_id = NextSeqId })
+ end,
+ Now = time_compat:monotonic_time(),
+ IoBatchSize = rabbit_misc:get_env(rabbit, msg_store_io_batch_size,
+ ?IO_BATCH_SIZE),
+
+ {ok, IndexMaxSize} = application:get_env(
+ rabbit, queue_index_embed_msgs_below),
+ State = #vqstate {
+ q1 = ?QUEUE:new(),
+ q2 = ?QUEUE:new(),
+ delta = Delta,
+ q3 = ?QUEUE:new(),
+ q4 = ?QUEUE:new(),
+ next_seq_id = NextSeqId,
+ ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty(),
+ index_state = IndexState1,
+ msg_store_clients = {PersistentClient, TransientClient},
+ durable = IsDurable,
+ transient_threshold = NextSeqId,
+ qi_embed_msgs_below = IndexMaxSize,
+
+ len = DeltaCount1,
+ persistent_count = DeltaCount1,
+ bytes = DeltaBytes1,
+ persistent_bytes = DeltaBytes1,
+
+ target_ram_count = infinity,
+ ram_msg_count = 0,
+ ram_msg_count_prev = 0,
+ ram_ack_count_prev = 0,
+ ram_bytes = 0,
+ unacked_bytes = 0,
+ out_counter = 0,
+ in_counter = 0,
+ rates = blank_rates(Now),
+ msgs_on_disk = gb_sets:new(),
+ msg_indices_on_disk = gb_sets:new(),
+ unconfirmed = gb_sets:new(),
+ confirmed = gb_sets:new(),
+ ack_out_counter = 0,
+ ack_in_counter = 0,
+ disk_read_count = 0,
+ disk_write_count = 0,
+
+ io_batch_size = IoBatchSize,
+
+ mode = default },
+ a(maybe_deltas_to_betas(State)).
+
+blank_rates(Now) ->
+ #rates { in = 0.0,
+ out = 0.0,
+ ack_in = 0.0,
+ ack_out = 0.0,
+ timestamp = Now}.
+
+in_r(MsgStatus = #msg_status { msg = undefined },
+ State = #vqstate { mode = default, q3 = Q3, q4 = Q4 }) ->
+ case ?QUEUE:is_empty(Q4) of
+ true -> State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) };
+ false -> {Msg, State1 = #vqstate { q4 = Q4a }} =
+ read_msg(MsgStatus, State),
+ MsgStatus1 = MsgStatus#msg_status{msg = Msg},
+ stats(ready0, {MsgStatus, MsgStatus1},
+ State1 #vqstate { q4 = ?QUEUE:in_r(MsgStatus1, Q4a) })
+ end;
+in_r(MsgStatus,
+ State = #vqstate { mode = default, q4 = Q4 }) ->
+ State #vqstate { q4 = ?QUEUE:in_r(MsgStatus, Q4) };
+%% lazy queues
+in_r(MsgStatus = #msg_status { seq_id = SeqId },
+ State = #vqstate { mode = lazy, q3 = Q3, delta = Delta}) ->
+ case ?QUEUE:is_empty(Q3) of
+ true ->
+ {_MsgStatus1, State1} =
+ maybe_write_to_disk(true, true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
+ Delta1 = expand_delta(SeqId, Delta),
+ State2 #vqstate{ delta = Delta1 };
+ false ->
+ State #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3) }
+ end.
+
+queue_out(State = #vqstate { mode = default, q4 = Q4 }) ->
+ case ?QUEUE:out(Q4) of
+ {empty, _Q4} ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end;
+ {{value, MsgStatus}, Q4a} ->
+ {{value, MsgStatus}, State #vqstate { q4 = Q4a }}
+ end;
+%% lazy queues
+queue_out(State = #vqstate { mode = lazy }) ->
+ case fetch_from_q3(State) of
+ {empty, _State1} = Result -> Result;
+ {loaded, {MsgStatus, State1}} -> {{value, MsgStatus}, State1}
+ end.
+
+read_msg(#msg_status{msg = undefined,
+ msg_id = MsgId,
+ is_persistent = IsPersistent}, State) ->
+ read_msg(MsgId, IsPersistent, State);
+read_msg(#msg_status{msg = Msg}, State) ->
+ {Msg, State}.
+
+read_msg(MsgId, IsPersistent, State = #vqstate{msg_store_clients = MSCState,
+ disk_read_count = Count}) ->
+ {{ok, Msg = #basic_message {}}, MSCState1} =
+ msg_store_read(MSCState, IsPersistent, MsgId),
+ {Msg, State #vqstate {msg_store_clients = MSCState1,
+ disk_read_count = Count + 1}}.
+
+stats(Signs, Statuses, State) ->
+ stats0(expand_signs(Signs), expand_statuses(Statuses), State).
+
+expand_signs(ready0) -> {0, 0, true};
+expand_signs(lazy_pub) -> {1, 0, true};
+expand_signs({A, B}) -> {A, B, false}.
+
+expand_statuses({none, A}) -> {false, msg_in_ram(A), A};
+expand_statuses({B, none}) -> {msg_in_ram(B), false, B};
+expand_statuses({lazy, A}) -> {false , false, A};
+expand_statuses({B, A}) -> {msg_in_ram(B), msg_in_ram(A), B}.
+
+%% In this function at least, we are religious: the variable name
+%% contains "Ready" or "Unacked" iff that is what it counts. If
+%% neither is present it counts both.
+stats0({DeltaReady, DeltaUnacked, ReadyMsgPaged},
+ {InRamBefore, InRamAfter, MsgStatus},
+ State = #vqstate{len = ReadyCount,
+ bytes = ReadyBytes,
+ ram_msg_count = RamReadyCount,
+ persistent_count = PersistentCount,
+ unacked_bytes = UnackedBytes,
+ ram_bytes = RamBytes,
+ persistent_bytes = PersistentBytes}) ->
+ S = msg_size(MsgStatus),
+ DeltaTotal = DeltaReady + DeltaUnacked,
+ DeltaRam = case {InRamBefore, InRamAfter} of
+ {false, false} -> 0;
+ {false, true} -> 1;
+ {true, false} -> -1;
+ {true, true} -> 0
+ end,
+ DeltaRamReady = case DeltaReady of
+ 1 -> one_if(InRamAfter);
+ -1 -> -one_if(InRamBefore);
+ 0 when ReadyMsgPaged -> DeltaRam;
+ 0 -> 0
+ end,
+ DeltaPersistent = DeltaTotal * one_if(MsgStatus#msg_status.is_persistent),
+ State#vqstate{len = ReadyCount + DeltaReady,
+ ram_msg_count = RamReadyCount + DeltaRamReady,
+ persistent_count = PersistentCount + DeltaPersistent,
+ bytes = ReadyBytes + DeltaReady * S,
+ unacked_bytes = UnackedBytes + DeltaUnacked * S,
+ ram_bytes = RamBytes + DeltaRam * S,
+ persistent_bytes = PersistentBytes + DeltaPersistent * S}.
+
+msg_size(#msg_status{msg_props = #message_properties{size = Size}}) -> Size.
+
+msg_in_ram(#msg_status{msg = Msg}) -> Msg =/= undefined.
+
+%% first param: AckRequired
+remove(true, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {out_counter = OutCount,
+ index_state = IndexState}) ->
+ %% Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ State1 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State),
+
+ State2 = stats({-1, 1}, {MsgStatus, MsgStatus}, State1),
+
+ {SeqId, maybe_update_rates(
+ State2 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState1})};
+
+%% This function body has the same behaviour as remove_queue_entries/3
+%% but instead of removing messages based on a ?QUEUE, this removes
+%% just one message, the one referenced by the MsgStatus provided.
+remove(false, MsgStatus = #msg_status {
+ seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State = #vqstate {out_counter = OutCount,
+ index_state = IndexState,
+ msg_store_clients = MSCState}) ->
+ %% Mark it delivered if necessary
+ IndexState1 = maybe_write_delivered(
+ IndexOnDisk andalso not IsDelivered,
+ SeqId, IndexState),
+
+ %% Remove from msg_store and queue index, if necessary
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+
+ IndexState2 =
+ case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState1);
+ false -> IndexState1
+ end,
+
+ State1 = stats({-1, 0}, {MsgStatus, none}, State),
+
+ {undefined, maybe_update_rates(
+ State1 #vqstate {out_counter = OutCount + 1,
+ index_state = IndexState2})}.
+
+%% This function exists as a way to improve dropwhile/2
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers and acks, instead of
+%% sending them one by one.
+%%
+%% Instead of removing every message as their are popped from the
+%% queue, it first accumulates them and then removes them by calling
+%% remove_queue_entries/3, since the behaviour of
+%% remove_queue_entries/3 when used with
+%% process_delivers_and_acks_fun(deliver_and_ack) is the same as
+%% calling remove(false, MsgStatus, State).
+%%
+%% remove/3 also updates the out_counter in every call, but here we do
+%% it just once at the end.
+remove_by_predicate(Pred, State = #vqstate {out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+ State2 =
+ remove_queue_entries(
+ QAcc, process_delivers_and_acks_fun(deliver_and_ack), State1),
+ %% maybe_update_rates/1 is called in remove/2 for every
+ %% message. Since we update out_counter only once, we call it just
+ %% there.
+ {MsgProps, maybe_update_rates(
+ State2 #vqstate {
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% This function exists as a way to improve fetchwhile/4
+%% performance. The idea of having this function is to optimise calls
+%% to rabbit_queue_index by batching delivers, instead of sending them
+%% one by one.
+%%
+%% Fun is the function passed to fetchwhile/4 that's
+%% applied to every fetched message and used to build the fetchwhile/4
+%% result accumulator FetchAcc.
+fetch_by_predicate(Pred, Fun, FetchAcc,
+ State = #vqstate {
+ index_state = IndexState,
+ out_counter = OutCount}) ->
+ {MsgProps, QAcc, State1} =
+ collect_by_predicate(Pred, ?QUEUE:new(), State),
+
+ {Delivers, FetchAcc1, State2} =
+ process_queue_entries(QAcc, Fun, FetchAcc, State1),
+
+ IndexState1 = rabbit_queue_index:deliver(Delivers, IndexState),
+
+ {MsgProps, FetchAcc1, maybe_update_rates(
+ State2 #vqstate {
+ index_state = IndexState1,
+ out_counter = OutCount + ?QUEUE:len(QAcc)})}.
+
+%% We try to do here the same as what remove(true, State) does but
+%% processing several messages at the same time. The idea is to
+%% optimize rabbit_queue_index:deliver/2 calls by sending a list of
+%% SeqIds instead of one by one, thus process_queue_entries1 will
+%% accumulate the required deliveries, will record_pending_ack for
+%% each message, and will update stats, like remove/2 does.
+%%
+%% For the meaning of Fun and FetchAcc arguments see
+%% fetch_by_predicate/4 above.
+process_queue_entries(Q, Fun, FetchAcc, State) ->
+ ?QUEUE:foldl(fun (MsgStatus, Acc) ->
+ process_queue_entries1(MsgStatus, Fun, Acc)
+ end,
+ {[], FetchAcc, State}, Q).
+
+process_queue_entries1(
+ #msg_status { seq_id = SeqId, is_delivered = IsDelivered,
+ index_on_disk = IndexOnDisk} = MsgStatus,
+ Fun,
+ {Delivers, FetchAcc, State}) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ State2 = record_pending_ack(
+ MsgStatus #msg_status {
+ is_delivered = true }, State1),
+ {cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ Fun(Msg, SeqId, FetchAcc),
+ stats({-1, 1}, {MsgStatus, MsgStatus}, State2)}.
+
+collect_by_predicate(Pred, QAcc, State) ->
+ case queue_out(State) of
+ {empty, State1} ->
+ {undefined, QAcc, State1};
+ {{value, MsgStatus = #msg_status { msg_props = MsgProps }}, State1} ->
+ case Pred(MsgProps) of
+ true -> collect_by_predicate(Pred, ?QUEUE:in(MsgStatus, QAcc),
+ State1);
+ false -> {MsgProps, QAcc, in_r(MsgStatus, State1)}
+ end
+ end.
+
+%%----------------------------------------------------------------------------
+%% Helpers for Public API purge/1 function
+%%----------------------------------------------------------------------------
+
+%% The difference between purge_when_pending_acks/1
+%% vs. purge_and_index_reset/1 is that the first one issues a deliver
+%% and an ack to the queue index for every message that's being
+%% removed, while the later just resets the queue index state.
+purge_when_pending_acks(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(deliver_and_ack), State),
+ a(State1).
+
+purge_and_index_reset(State) ->
+ State1 = purge1(process_delivers_and_acks_fun(none), State),
+ a(reset_qi_state(State1)).
+
+%% This function removes messages from each of {q1, q2, q3, q4}.
+%%
+%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
+%% are specially handled by purge_betas_and_deltas/2.
+%%
+%% purge_betas_and_deltas/2 loads messages from the queue index,
+%% filling up q3 and in some cases moving messages form q2 to q3 while
+%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The
+%% messages loaded into q3 are removed by calling
+%% remove_queue_entries/3 until there are no more messages to be read
+%% from the queue index. Messages are read in batches from the queue
+%% index.
+purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
+ State1 = remove_queue_entries(Q4, AfterFun, State),
+
+ State2 = #vqstate {q1 = Q1} =
+ purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),
+
+ State3 = remove_queue_entries(Q1, AfterFun, State2),
+
+ a(State3#vqstate{q1 = ?QUEUE:new()}).
+
+reset_qi_state(State = #vqstate{index_state = IndexState}) ->
+ State#vqstate{index_state =
+ rabbit_queue_index:reset_state(IndexState)}.
+
+is_pending_ack_empty(State) ->
+ count_pending_acks(State) =:= 0.
+
+count_pending_acks(#vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ gb_trees:size(RPA) + gb_trees:size(DPA) + gb_trees:size(QPA).
+
+purge_betas_and_deltas(DelsAndAcksFun, State = #vqstate { mode = Mode }) ->
+ State0 = #vqstate { q3 = Q3 } =
+ case Mode of
+ lazy -> maybe_deltas_to_betas(DelsAndAcksFun, State);
+ _ -> State
+ end,
+
+ case ?QUEUE:is_empty(Q3) of
+ true -> State0;
+ false -> State1 = remove_queue_entries(Q3, DelsAndAcksFun, State0),
+ purge_betas_and_deltas(DelsAndAcksFun,
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
+ State1#vqstate{q3 = ?QUEUE:new()}))
+ end.
+
+remove_queue_entries(Q, DelsAndAcksFun,
+ State = #vqstate{msg_store_clients = MSCState}) ->
+ {MsgIdsByStore, Delivers, Acks, State1} =
+ ?QUEUE:foldl(fun remove_queue_entries1/2,
+ {orddict:new(), [], [], State}, Q),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ DelsAndAcksFun(Delivers, Acks, State1).
+
+remove_queue_entries1(
+ #msg_status { msg_id = MsgId, seq_id = SeqId, is_delivered = IsDelivered,
+ msg_in_store = MsgInStore, index_on_disk = IndexOnDisk,
+ is_persistent = IsPersistent} = MsgStatus,
+ {MsgIdsByStore, Delivers, Acks, State}) ->
+ {case MsgInStore of
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
+ cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers),
+ cons_if(IndexOnDisk, SeqId, Acks),
+ stats({-1, 0}, {MsgStatus, none}, State)}.
+
+process_delivers_and_acks_fun(deliver_and_ack) ->
+ fun (Delivers, Acks, State = #vqstate { index_state = IndexState }) ->
+ IndexState1 =
+ rabbit_queue_index:ack(
+ Acks, rabbit_queue_index:deliver(Delivers, IndexState)),
+ State #vqstate { index_state = IndexState1 }
+ end;
+process_delivers_and_acks_fun(_) ->
+ fun (_, _, State) ->
+ State
+ end.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for publishing
+%%----------------------------------------------------------------------------
+
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { q1 = Q1, q3 = Q3, q4 = Q4,
+ mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = case ?QUEUE:is_empty(Q3) of
+ false -> State1 #vqstate { q1 = ?QUEUE:in(m(MsgStatus1), Q1) };
+ true -> State1 #vqstate { q4 = ?QUEUE:in(m(MsgStatus1), Q4) }
+ end,
+ InCount1 = InCount + 1,
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats({1, 0}, {none, MsgStatus1},
+ State2#vqstate{ next_seq_id = SeqId + 1,
+ in_counter = InCount1,
+ unconfirmed = UC1 });
+publish1(Msg = #basic_message { is_persistent = IsPersistent, id = MsgId },
+ MsgProps = #message_properties { needs_confirming = NeedsConfirming },
+ IsDelivered, _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC,
+ delta = Delta }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ Delta1 = expand_delta(SeqId, Delta),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ stats(lazy_pub, {lazy, m(MsgStatus1)},
+ State1#vqstate{ delta = Delta1,
+ next_seq_id = SeqId + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }).
+
+batch_publish1({Msg, MsgProps, IsDelivered}, {ChPid, Flow, State}) ->
+ {ChPid, Flow, publish1(Msg, MsgProps, IsDelivered, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4, State)}.
+
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = default,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3};
+publish_delivered1(Msg = #basic_message { is_persistent = IsPersistent,
+ id = MsgId },
+ MsgProps = #message_properties {
+ needs_confirming = NeedsConfirming },
+ _ChPid, _Flow, PersistFun,
+ State = #vqstate { mode = lazy,
+ qi_embed_msgs_below = IndexMaxSize,
+ next_seq_id = SeqId,
+ out_counter = OutCount,
+ in_counter = InCount,
+ durable = IsDurable,
+ unconfirmed = UC }) ->
+ IsPersistent1 = IsDurable andalso IsPersistent,
+ MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize),
+ {MsgStatus1, State1} = PersistFun(true, true, MsgStatus, State),
+ State2 = record_pending_ack(m(MsgStatus1), State1),
+ UC1 = gb_sets_maybe_insert(NeedsConfirming, MsgId, UC),
+ State3 = stats({0, 1}, {none, MsgStatus1},
+ State2 #vqstate { next_seq_id = SeqId + 1,
+ out_counter = OutCount + 1,
+ in_counter = InCount + 1,
+ unconfirmed = UC1 }),
+ {SeqId, State3}.
+
+batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) ->
+ {SeqId, State1} =
+ publish_delivered1(Msg, MsgProps, ChPid, Flow,
+ fun maybe_prepare_write_to_disk/4,
+ State),
+ {ChPid, Flow, [SeqId | SeqIds], State1}.
+
+maybe_write_msg_to_disk(_Force, MsgStatus = #msg_status {
+ msg_in_store = true }, State) ->
+ {MsgStatus, State};
+maybe_write_msg_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg, msg_id = MsgId,
+ is_persistent = IsPersistent },
+ State = #vqstate{ msg_store_clients = MSCState,
+ disk_write_count = Count})
+ when Force orelse IsPersistent ->
+ case persist_to(MsgStatus) of
+ msg_store -> ok = msg_store_write(MSCState, IsPersistent, MsgId,
+ prepare_to_store(Msg)),
+ {MsgStatus#msg_status{msg_in_store = true},
+ State#vqstate{disk_write_count = Count + 1}};
+ queue_index -> {MsgStatus, State}
+ end;
+maybe_write_msg_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+%% Due to certain optimizations made inside
+%% rabbit_queue_index:pre_publish/7 we need to have two separate
+%% functions for index persistence. This one is only used when paging
+%% during memory pressure. We didn't want to modify
+%% maybe_write_index_to_disk/3 because that function is used in other
+%% places.
+maybe_batch_write_index_to_disk(_Force,
+ MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+maybe_batch_write_index_to_disk(Force,
+ MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate {
+ target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
+ when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:pre_publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered,
+ TargetRamCount, IndexState),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState1,
+ disk_write_count = DiskWriteCount1}};
+maybe_batch_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+maybe_write_index_to_disk(_Force, MsgStatus = #msg_status {
+ index_on_disk = true }, State) ->
+ {MsgStatus, State};
+maybe_write_index_to_disk(Force, MsgStatus = #msg_status {
+ msg = Msg,
+ msg_id = MsgId,
+ seq_id = SeqId,
+ is_persistent = IsPersistent,
+ is_delivered = IsDelivered,
+ msg_props = MsgProps},
+ State = #vqstate{target_ram_count = TargetRamCount,
+ disk_write_count = DiskWriteCount,
+ index_state = IndexState})
+ when Force orelse IsPersistent ->
+ {MsgOrId, DiskWriteCount1} =
+ case persist_to(MsgStatus) of
+ msg_store -> {MsgId, DiskWriteCount};
+ queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1}
+ end,
+ IndexState1 = rabbit_queue_index:publish(
+ MsgOrId, SeqId, MsgProps, IsPersistent, TargetRamCount,
+ IndexState),
+ IndexState2 = maybe_write_delivered(IsDelivered, SeqId, IndexState1),
+ {MsgStatus#msg_status{index_on_disk = true},
+ State#vqstate{index_state = IndexState2,
+ disk_write_count = DiskWriteCount1}};
+
+maybe_write_index_to_disk(_Force, MsgStatus, State) ->
+ {MsgStatus, State}.
+
+maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
+maybe_prepare_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State),
+ maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1).
+
+determine_persist_to(#basic_message{
+ content = #content{properties = Props,
+ properties_bin = PropsBin}},
+ #message_properties{size = BodySize},
+ IndexMaxSize) ->
+ %% The >= is so that you can set the env to 0 and never persist
+ %% to the index.
+ %%
+ %% We want this to be fast, so we avoid size(term_to_binary())
+ %% here, or using the term size estimation from truncate.erl, both
+ %% of which are too slow. So instead, if the message body size
+ %% goes over the limit then we avoid any other checks.
+ %%
+ %% If it doesn't we need to decide if the properties will push
+ %% it past the limit. If we have the encoded properties (usual
+ %% case) we can just check their size. If we don't (message came
+ %% via the direct client), we make a guess based on the number of
+ %% headers.
+ case BodySize >= IndexMaxSize of
+ true -> msg_store;
+ false -> Est = case is_binary(PropsBin) of
+ true -> BodySize + size(PropsBin);
+ false -> #'P_basic'{headers = Hs} = Props,
+ case Hs of
+ undefined -> 0;
+ _ -> length(Hs)
+ end * ?HEADER_GUESS_SIZE + BodySize
+ end,
+ case Est >= IndexMaxSize of
+ true -> msg_store;
+ false -> queue_index
+ end
+ end.
+
+persist_to(#msg_status{persist_to = To}) -> To.
+
+prepare_to_store(Msg) ->
+ Msg#basic_message{
+ %% don't persist any recoverable decoded properties
+ content = rabbit_binary_parser:clear_decoded_content(
+ Msg #basic_message.content)}.
+
+%%----------------------------------------------------------------------------
+%% Internal gubbins for acks
+%%----------------------------------------------------------------------------
+
+record_pending_ack(#msg_status { seq_id = SeqId } = MsgStatus,
+ State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA,
+ ack_in_counter = AckInCount}) ->
+ Insert = fun (Tree) -> gb_trees:insert(SeqId, MsgStatus, Tree) end,
+ {RPA1, DPA1, QPA1} =
+ case {msg_in_ram(MsgStatus), persist_to(MsgStatus)} of
+ {false, _} -> {RPA, Insert(DPA), QPA};
+ {_, queue_index} -> {RPA, DPA, Insert(QPA)};
+ {_, msg_store} -> {Insert(RPA), DPA, QPA}
+ end,
+ State #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1,
+ qi_pending_ack = QPA1,
+ ack_in_counter = AckInCount + 1}.
+
+lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> V;
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} -> V;
+ none -> gb_trees:get(SeqId, QPA)
+ end
+ end.
+
+%% First parameter = UpdateStats
+remove_pending_ack(true, SeqId, State) ->
+ {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
+remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA}) ->
+ case gb_trees:lookup(SeqId, RPA) of
+ {value, V} -> RPA1 = gb_trees:delete(SeqId, RPA),
+ {V, State #vqstate { ram_pending_ack = RPA1 }};
+ none -> case gb_trees:lookup(SeqId, DPA) of
+ {value, V} ->
+ DPA1 = gb_trees:delete(SeqId, DPA),
+ {V, State#vqstate{disk_pending_ack = DPA1}};
+ none ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {gb_trees:get(SeqId, QPA),
+ State#vqstate{qi_pending_ack = QPA1}}
+ end
+ end.
+
+purge_pending_ack(KeepPersistent,
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ case KeepPersistent of
+ true -> remove_transient_msgs_by_id(MsgIdsByStore, MSCState),
+ State1;
+ false -> IndexState1 =
+ rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }
+ end.
+
+purge_pending_ack_delete_and_terminate(
+ State = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState }) ->
+ {_, MsgIdsByStore, State1} = purge_pending_ack1(State),
+ IndexState1 = rabbit_queue_index:delete_and_terminate(IndexState),
+ remove_msgs_by_id(MsgIdsByStore, MSCState),
+ State1 #vqstate { index_state = IndexState1 }.
+
+purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA,
+ qi_pending_ack = QPA }) ->
+ F = fun (_SeqId, MsgStatus, Acc) -> accumulate_ack(MsgStatus, Acc) end,
+ {IndexOnDiskSeqIds, MsgIdsByStore, _AllMsgIds} =
+ rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, rabbit_misc:gb_trees_fold(
+ F, accumulate_ack_init(), RPA), DPA), QPA),
+ State1 = State #vqstate { ram_pending_ack = gb_trees:empty(),
+ disk_pending_ack = gb_trees:empty(),
+ qi_pending_ack = gb_trees:empty()},
+ {IndexOnDiskSeqIds, MsgIdsByStore, State1}.
+
+%% MsgIdsByStore is an orddict with two keys:
+%%
+%% true: holds a list of Persistent Message Ids.
+%% false: holds a list of Transient Message Ids.
+%%
+%% When we call orddict:to_list/1 we get two sets of msg ids, where
+%% IsPersistent is either true for persistent messages or false for
+%% transient ones. The msg_store_remove/3 function takes this boolean
+%% flag to determine from which store the messages should be removed
+%% from.
+remove_msgs_by_id(MsgIdsByStore, MSCState) ->
+ [ok = msg_store_remove(MSCState, IsPersistent, MsgIds)
+ || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)].
+
+remove_transient_msgs_by_id(MsgIdsByStore, MSCState) ->
+ case orddict:find(false, MsgIdsByStore) of
+ error -> ok;
+ {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds)
+ end.
+
+accumulate_ack_init() -> {[], orddict:new(), []}.
+
+accumulate_ack(#msg_status { seq_id = SeqId,
+ msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) ->
+ {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc),
+ case MsgInStore of
+ true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore);
+ false -> MsgIdsByStore
+ end,
+ [MsgId | AllMsgIds]}.
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing for confirms (aka publisher acks)
+%%----------------------------------------------------------------------------
+
+record_confirms(MsgIdSet, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC,
+ confirmed = C }) ->
+ State #vqstate {
+ msgs_on_disk = rabbit_misc:gb_sets_difference(MOD, MsgIdSet),
+ msg_indices_on_disk = rabbit_misc:gb_sets_difference(MIOD, MsgIdSet),
+ unconfirmed = rabbit_misc:gb_sets_difference(UC, MsgIdSet),
+ confirmed = gb_sets:union(C, MsgIdSet) }.
+
+msgs_written_to_disk(Callback, MsgIdSet, ignored) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end);
+msgs_written_to_disk(Callback, MsgIdSet, written) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MIOD),
+ State #vqstate {
+ msgs_on_disk =
+ gb_sets:union(MOD, Confirmed) })
+ end).
+
+msg_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State = #vqstate { msgs_on_disk = MOD,
+ msg_indices_on_disk = MIOD,
+ unconfirmed = UC }) ->
+ Confirmed = gb_sets:intersection(UC, MsgIdSet),
+ record_confirms(gb_sets:intersection(MsgIdSet, MOD),
+ State #vqstate {
+ msg_indices_on_disk =
+ gb_sets:union(MIOD, Confirmed) })
+ end).
+
+msgs_and_indices_written_to_disk(Callback, MsgIdSet) ->
+ Callback(?MODULE,
+ fun (?MODULE, State) -> record_confirms(MsgIdSet, State) end).
+
+%%----------------------------------------------------------------------------
+%% Internal plumbing for requeue
+%%----------------------------------------------------------------------------
+
+publish_alpha(#msg_status { msg = undefined } = MsgStatus, State) ->
+ {Msg, State1} = read_msg(MsgStatus, State),
+ MsgStatus1 = MsgStatus#msg_status { msg = Msg },
+ {MsgStatus1, stats({1, -1}, {MsgStatus, MsgStatus1}, State1)};
+publish_alpha(MsgStatus, State) ->
+ {MsgStatus, stats({1, -1}, {MsgStatus, MsgStatus}, State)}.
+
+publish_beta(MsgStatus, State) ->
+ {MsgStatus1, State1} = maybe_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ {MsgStatus2, stats({1, -1}, {MsgStatus, MsgStatus2}, State1)}.
+
+%% Rebuild queue, inserting sequence ids to maintain ordering
+queue_merge(SeqIds, Q, MsgIds, Limit, PubFun, State) ->
+ queue_merge(SeqIds, Q, ?QUEUE:new(), MsgIds,
+ Limit, PubFun, State).
+
+queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
+ Limit, PubFun, State)
+ when Limit == undefined orelse SeqId < Limit ->
+ case ?QUEUE:out(Q) of
+ {{value, #msg_status { seq_id = SeqIdQ } = MsgStatus}, Q1}
+ when SeqIdQ < SeqId ->
+ %% enqueue from the remaining queue
+ queue_merge(SeqIds, Q1, ?QUEUE:in(MsgStatus, Front), MsgIds,
+ Limit, PubFun, State);
+ {_, _Q1} ->
+ %% enqueue from the remaining list of sequence ids
+ {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, State2)
+ end;
+queue_merge(SeqIds, Q, Front, MsgIds,
+ _Limit, _PubFun, State) ->
+ {SeqIds, ?QUEUE:join(Front, Q), MsgIds, State}.
+
+delta_merge([], Delta, MsgIds, State) ->
+ {Delta, MsgIds, State};
+delta_merge(SeqIds, Delta, MsgIds, State) ->
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
+ msg_from_pending_ack(SeqId, State0),
+ {_MsgStatus, State2} =
+ maybe_write_to_disk(true, true, MsgStatus, State1),
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ stats({1, -1}, {MsgStatus, none}, State2)}
+ end, {Delta, MsgIds, State}, SeqIds).
+
+%% Mostly opposite of record_pending_ack/2
+msg_from_pending_ack(SeqId, State) ->
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
+ remove_pending_ack(false, SeqId, State),
+ {MsgStatus #msg_status {
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}.
+
+beta_limit(Q) ->
+ case ?QUEUE:peek(Q) of
+ {value, #msg_status { seq_id = SeqId }} -> SeqId;
+ empty -> undefined
+ end.
+
+delta_limit(?BLANK_DELTA_PATTERN(_X)) -> undefined;
+delta_limit(#delta { start_seq_id = StartSeqId }) -> StartSeqId.
+
+%%----------------------------------------------------------------------------
+%% Iterator
+%%----------------------------------------------------------------------------
+
+ram_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.ram_pending_ack)}.
+
+disk_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.disk_pending_ack)}.
+
+qi_ack_iterator(State) ->
+ {ack, gb_trees:iterator(State#vqstate.qi_pending_ack)}.
+
+msg_iterator(State) -> istate(start, State).
+
+istate(start, State) -> {q4, State#vqstate.q4, State};
+istate(q4, State) -> {q3, State#vqstate.q3, State};
+istate(q3, State) -> {delta, State#vqstate.delta, State};
+istate(delta, State) -> {q2, State#vqstate.q2, State};
+istate(q2, State) -> {q1, State#vqstate.q1, State};
+istate(q1, _State) -> done.
+
+next({ack, It}, IndexState) ->
+ case gb_trees:next(It) of
+ none -> {empty, IndexState};
+ {_SeqId, MsgStatus, It1} -> Next = {ack, It1},
+ {value, MsgStatus, true, Next, IndexState}
+ end;
+next(done, IndexState) -> {empty, IndexState};
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqId}, State}, IndexState) ->
+ next(istate(delta, State), IndexState);
+next({delta, #delta{start_seq_id = SeqId,
+ end_seq_id = SeqIdEnd} = Delta, State}, IndexState) ->
+ SeqIdB = rabbit_queue_index:next_segment_boundary(SeqId),
+ SeqId1 = lists:min([SeqIdB, SeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(SeqId, SeqId1, IndexState),
+ next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1);
+next({delta, Delta, [], State}, IndexState) ->
+ next({delta, Delta, State}, IndexState);
+next({delta, Delta, [{_, SeqId, _, _, _} = M | Rest], State}, IndexState) ->
+ case is_msg_in_pending_acks(SeqId, State) of
+ false -> Next = {delta, Delta, Rest, State},
+ {value, beta_msg_status(M), false, Next, IndexState};
+ true -> next({delta, Delta, Rest, State}, IndexState)
+ end;
+next({Key, Q, State}, IndexState) ->
+ case ?QUEUE:out(Q) of
+ {empty, _Q} -> next(istate(Key, State), IndexState);
+ {{value, MsgStatus}, QN} -> Next = {Key, QN, State},
+ {value, MsgStatus, false, Next, IndexState}
+ end.
+
+inext(It, {Its, IndexState}) ->
+ case next(It, IndexState) of
+ {empty, IndexState1} ->
+ {Its, IndexState1};
+ {value, MsgStatus1, Unacked, It1, IndexState1} ->
+ {[{MsgStatus1, Unacked, It1} | Its], IndexState1}
+ end.
+
+ifold(_Fun, Acc, [], State) ->
+ {Acc, State};
+ifold(Fun, Acc, Its, State) ->
+ [{MsgStatus, Unacked, It} | Rest] =
+ lists:sort(fun ({#msg_status{seq_id = SeqId1}, _, _},
+ {#msg_status{seq_id = SeqId2}, _, _}) ->
+ SeqId1 =< SeqId2
+ end, Its),
+ {Msg, State1} = read_msg(MsgStatus, State),
+ case Fun(Msg, MsgStatus#msg_status.msg_props, Unacked, Acc) of
+ {stop, Acc1} ->
+ {Acc1, State};
+ {cont, Acc1} ->
+ {Its1, IndexState1} = inext(It, {Rest, State1#vqstate.index_state}),
+ ifold(Fun, Acc1, Its1, State1#vqstate{index_state = IndexState1})
+ end.
+
+%%----------------------------------------------------------------------------
+%% Phase changes
+%%----------------------------------------------------------------------------
+
+reduce_memory_use(State = #vqstate { target_ram_count = infinity }) ->
+ State;
+reduce_memory_use(State = #vqstate {
+ mode = default,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount,
+ io_batch_size = IoBatchSize,
+ rates = #rates { in = AvgIngress,
+ out = AvgEgress,
+ ack_in = AvgAckIngress,
+ ack_out = AvgAckEgress } }) ->
+
+ State1 = #vqstate { q2 = Q2, q3 = Q3 } =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> State;
+ %% Reduce memory of pending acks and alphas. The order is
+ %% determined based on which is growing faster. Whichever
+ %% comes second may very well get a quota of 0 if the
+ %% first manages to push out the max number of messages.
+ S1 -> Funs = case ((AvgAckIngress - AvgAckEgress) >
+ (AvgIngress - AvgEgress)) of
+ true -> [fun limit_ram_acks/2,
+ fun push_alphas_to_betas/2];
+ false -> [fun push_alphas_to_betas/2,
+ fun limit_ram_acks/2]
+ end,
+ {_, State2} = lists:foldl(fun (ReduceFun, {QuotaN, StateN}) ->
+ ReduceFun(QuotaN, StateN)
+ end, {S1, State}, Funs),
+ State2
+ end,
+
+ State3 =
+ case chunk_size(?QUEUE:len(Q2) + ?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ S2 when S2 >= IoBatchSize ->
+ %% There is an implicit, but subtle, upper bound here. We
+ %% may shuffle a lot of messages from Q2/3 into delta, but
+ %% the number of these that require any disk operation,
+ %% namely index writing, i.e. messages that are genuine
+ %% betas and not gammas, is bounded by the credit_flow
+ %% limiting of the alpha->beta conversion above.
+ push_betas_to_deltas(S2, State1);
+ _ ->
+ State1
+ end,
+ %% See rabbitmq-server-290 for the reasons behind this GC call.
+ garbage_collect(),
+ State3;
+%% When using lazy queues, there are no alphas, so we don't need to
+%% call push_alphas_to_betas/2.
+reduce_memory_use(State = #vqstate {
+ mode = lazy,
+ ram_pending_ack = RPA,
+ ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount }) ->
+ State1 = #vqstate { q3 = Q3 } =
+ case chunk_size(RamMsgCount + gb_trees:size(RPA), TargetRamCount) of
+ 0 -> State;
+ S1 -> {_, State2} = limit_ram_acks(S1, State),
+ State2
+ end,
+
+ State3 =
+ case chunk_size(?QUEUE:len(Q3),
+ permitted_beta_count(State1)) of
+ 0 ->
+ State1;
+ S2 ->
+ push_betas_to_deltas(S2, State1)
+ end,
+ garbage_collect(),
+ State3.
+
+limit_ram_acks(0, State) ->
+ {0, ui(State)};
+limit_ram_acks(Quota, State = #vqstate { ram_pending_ack = RPA,
+ disk_pending_ack = DPA }) ->
+ case gb_trees:is_empty(RPA) of
+ true ->
+ {Quota, ui(State)};
+ false ->
+ {SeqId, MsgStatus, RPA1} = gb_trees:take_largest(RPA),
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus, State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ DPA1 = gb_trees:insert(SeqId, MsgStatus2, DPA),
+ limit_ram_acks(Quota - 1,
+ stats({0, 0}, {MsgStatus, MsgStatus2},
+ State1 #vqstate { ram_pending_ack = RPA1,
+ disk_pending_ack = DPA1 }))
+ end.
+
+permitted_beta_count(#vqstate { len = 0 }) ->
+ infinity;
+permitted_beta_count(#vqstate { mode = lazy,
+ target_ram_count = TargetRamCount}) ->
+ TargetRamCount;
+permitted_beta_count(#vqstate { target_ram_count = 0, q3 = Q3 }) ->
+ lists:min([?QUEUE:len(Q3), rabbit_queue_index:next_segment_boundary(0)]);
+permitted_beta_count(#vqstate { q1 = Q1,
+ q4 = Q4,
+ target_ram_count = TargetRamCount,
+ len = Len }) ->
+ BetaDelta = Len - ?QUEUE:len(Q1) - ?QUEUE:len(Q4),
+ lists:max([rabbit_queue_index:next_segment_boundary(0),
+ BetaDelta - ((BetaDelta * BetaDelta) div
+ (BetaDelta + TargetRamCount))]).
+
+chunk_size(Current, Permitted)
+ when Permitted =:= infinity orelse Permitted >= Current ->
+ 0;
+chunk_size(Current, Permitted) ->
+ Current - Permitted.
+
+fetch_from_q3(State = #vqstate { mode = default,
+ q1 = Q1,
+ q2 = Q2,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3,
+ q4 = Q4 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} ->
+ {empty, State};
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ State2 = case {?QUEUE:is_empty(Q3a), 0 == DeltaCount} of
+ {true, true} ->
+ %% q3 is now empty, it wasn't before;
+ %% delta is still empty. So q2 must be
+ %% empty, and we know q4 is empty
+ %% otherwise we wouldn't be loading from
+ %% q3. As such, we can just set q4 to Q1.
+ true = ?QUEUE:is_empty(Q2), %% ASSERTION
+ true = ?QUEUE:is_empty(Q4), %% ASSERTION
+ State1 #vqstate { q1 = ?QUEUE:new(), q4 = Q1 };
+ {true, false} ->
+ maybe_deltas_to_betas(State1);
+ {false, _} ->
+ %% q3 still isn't empty, we've not
+ %% touched delta, so the invariants
+ %% between q1, q2, delta and q3 are
+ %% maintained
+ State1
+ end,
+ {loaded, {MsgStatus, State2}}
+ end;
+%% lazy queues
+fetch_from_q3(State = #vqstate { mode = lazy,
+ delta = #delta { count = DeltaCount },
+ q3 = Q3 }) ->
+ case ?QUEUE:out(Q3) of
+ {empty, _Q3} when DeltaCount =:= 0 ->
+ {empty, State};
+ {empty, _Q3} ->
+ fetch_from_q3(maybe_deltas_to_betas(State));
+ {{value, MsgStatus}, Q3a} ->
+ State1 = State #vqstate { q3 = Q3a },
+ {loaded, {MsgStatus, State1}}
+ end.
+
+maybe_deltas_to_betas(State) ->
+ AfterFun = process_delivers_and_acks_fun(deliver_and_ack),
+ maybe_deltas_to_betas(AfterFun, State).
+
+maybe_deltas_to_betas(_DelsAndAcksFun,
+ State = #vqstate {delta = ?BLANK_DELTA_PATTERN(X) }) ->
+ State;
+maybe_deltas_to_betas(DelsAndAcksFun,
+ State = #vqstate {
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3,
+ index_state = IndexState,
+ ram_msg_count = RamMsgCount,
+ ram_bytes = RamBytes,
+ disk_read_count = DiskReadCount,
+ transient_threshold = TransientThreshold }) ->
+ #delta { start_seq_id = DeltaSeqId,
+ count = DeltaCount,
+ end_seq_id = DeltaSeqIdEnd } = Delta,
+ DeltaSeqId1 =
+ lists:min([rabbit_queue_index:next_segment_boundary(DeltaSeqId),
+ DeltaSeqIdEnd]),
+ {List, IndexState1} = rabbit_queue_index:read(DeltaSeqId, DeltaSeqId1,
+ IndexState),
+ {Q3a, RamCountsInc, RamBytesInc, State1} =
+ betas_from_index_entries(List, TransientThreshold,
+ DelsAndAcksFun,
+ State #vqstate { index_state = IndexState1 }),
+ State2 = State1 #vqstate { ram_msg_count = RamMsgCount + RamCountsInc,
+ ram_bytes = RamBytes + RamBytesInc,
+ disk_read_count = DiskReadCount + RamCountsInc },
+ case ?QUEUE:len(Q3a) of
+ 0 ->
+ %% we ignored every message in the segment due to it being
+ %% transient and below the threshold
+ maybe_deltas_to_betas(
+ DelsAndAcksFun,
+ State2 #vqstate {
+ delta = d(Delta #delta { start_seq_id = DeltaSeqId1 })});
+ Q3aLen ->
+ Q3b = ?QUEUE:join(Q3, Q3a),
+ case DeltaCount - Q3aLen of
+ 0 ->
+ %% delta is now empty, but it wasn't before, so
+ %% can now join q2 onto q3
+ State2 #vqstate { q2 = ?QUEUE:new(),
+ delta = ?BLANK_DELTA,
+ q3 = ?QUEUE:join(Q3b, Q2) };
+ N when N > 0 ->
+ Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
+ count = N,
+ end_seq_id = DeltaSeqIdEnd }),
+ State2 #vqstate { delta = Delta1,
+ q3 = Q3b }
+ end
+ end.
+
+push_alphas_to_betas(Quota, State) ->
+ {Quota1, State1} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out/1,
+ fun (MsgStatus, Q1a,
+ State0 = #vqstate { q3 = Q3, delta = #delta { count = 0 } }) ->
+ State0 #vqstate { q1 = Q1a, q3 = ?QUEUE:in(MsgStatus, Q3) };
+ (MsgStatus, Q1a, State0 = #vqstate { q2 = Q2 }) ->
+ State0 #vqstate { q1 = Q1a, q2 = ?QUEUE:in(MsgStatus, Q2) }
+ end, Quota, State #vqstate.q1, State),
+ {Quota2, State2} =
+ push_alphas_to_betas(
+ fun ?QUEUE:out_r/1,
+ fun (MsgStatus, Q4a, State0 = #vqstate { q3 = Q3 }) ->
+ State0 #vqstate { q3 = ?QUEUE:in_r(MsgStatus, Q3), q4 = Q4a }
+ end, Quota1, State1 #vqstate.q4, State1),
+ {Quota2, State2}.
+
+push_alphas_to_betas(_Generator, _Consumer, Quota, _Q,
+ State = #vqstate { ram_msg_count = RamMsgCount,
+ target_ram_count = TargetRamCount })
+ when Quota =:= 0 orelse
+ TargetRamCount =:= infinity orelse
+ TargetRamCount >= RamMsgCount ->
+ {Quota, ui(State)};
+push_alphas_to_betas(Generator, Consumer, Quota, Q, State) ->
+ %% We consume credits from the message_store whenever we need to
+ %% persist a message to disk. See:
+ %% rabbit_variable_queue:msg_store_write/4. So perhaps the
+ %% msg_store is trying to throttle down our queue.
+ case credit_flow:blocked() of
+ true -> {Quota, ui(State)};
+ false -> case Generator(Q) of
+ {empty, _Q} ->
+ {Quota, ui(State)};
+ {{value, MsgStatus}, Qa} ->
+ {MsgStatus1, State1} =
+ maybe_prepare_write_to_disk(true, false, MsgStatus,
+ State),
+ MsgStatus2 = m(trim_msg_status(MsgStatus1)),
+ State2 = stats(
+ ready0, {MsgStatus, MsgStatus2}, State1),
+ State3 = Consumer(MsgStatus2, Qa, State2),
+ push_alphas_to_betas(Generator, Consumer, Quota - 1,
+ Qa, State3)
+ end
+ end.
+
+push_betas_to_deltas(Quota, State = #vqstate { mode = default,
+ q2 = Q2,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun rabbit_queue_index:next_segment_boundary/1,
+ Q3, PushState),
+ {Q2a, PushState2} = push_betas_to_deltas(
+ fun ?QUEUE:out/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q2, PushState1),
+ {_, Delta1, State1} = PushState2,
+ State1 #vqstate { q2 = Q2a,
+ delta = Delta1,
+ q3 = Q3a };
+%% In the case of lazy queues we want to page as many messages as
+%% possible from q3.
+push_betas_to_deltas(Quota, State = #vqstate { mode = lazy,
+ delta = Delta,
+ q3 = Q3}) ->
+ PushState = {Quota, Delta, State},
+ {Q3a, PushState1} = push_betas_to_deltas(
+ fun ?QUEUE:out_r/1,
+ fun (Q2MinSeqId) -> Q2MinSeqId end,
+ Q3, PushState),
+ {_, Delta1, State1} = PushState1,
+ State1 #vqstate { delta = Delta1,
+ q3 = Q3a }.
+
+
+push_betas_to_deltas(Generator, LimitFun, Q, PushState) ->
+ case ?QUEUE:is_empty(Q) of
+ true ->
+ {Q, PushState};
+ false ->
+ {value, #msg_status { seq_id = MinSeqId }} = ?QUEUE:peek(Q),
+ {value, #msg_status { seq_id = MaxSeqId }} = ?QUEUE:peek_r(Q),
+ Limit = LimitFun(MinSeqId),
+ case MaxSeqId < Limit of
+ true -> {Q, PushState};
+ false -> push_betas_to_deltas1(Generator, Limit, Q, PushState)
+ end
+ end.
+
+push_betas_to_deltas1(_Generator, _Limit, Q, {0, Delta, State}) ->
+ {Q, {0, Delta, ui(State)}};
+push_betas_to_deltas1(Generator, Limit, Q, {Quota, Delta, State}) ->
+ case Generator(Q) of
+ {empty, _Q} ->
+ {Q, {Quota, Delta, ui(State)}};
+ {{value, #msg_status { seq_id = SeqId }}, _Qa}
+ when SeqId < Limit ->
+ {Q, {Quota, Delta, ui(State)}};
+ {{value, MsgStatus = #msg_status { seq_id = SeqId }}, Qa} ->
+ {#msg_status { index_on_disk = true }, State1} =
+ maybe_batch_write_index_to_disk(true, MsgStatus, State),
+ State2 = stats(ready0, {MsgStatus, none}, State1),
+ Delta1 = expand_delta(SeqId, Delta),
+ push_betas_to_deltas1(Generator, Limit, Qa,
+ {Quota - 1, Delta1, State2})
+ end.
+
+%% Flushes queue index batch caches and updates queue index state.
+ui(#vqstate{index_state = IndexState,
+ target_ram_count = TargetRamCount} = State) ->
+ IndexState1 = rabbit_queue_index:flush_pre_publish_cache(
+ TargetRamCount, IndexState),
+ State#vqstate{index_state = IndexState1}.
+
+%% Delay
+maybe_delay(QPA) ->
+ case is_timeout_test(gb_trees:values(QPA)) of
+ true -> receive
+ %% The queue received an EXIT message, it's probably the
+ %% node being stopped with "rabbitmqctl stop". Thus, abort
+ %% the wait and requeue the EXIT message.
+ {'EXIT', _, shutdown} = ExitMsg -> self() ! ExitMsg,
+ void
+ after infinity -> void
+ end;
+ _ -> void
+ end.
+
+is_timeout_test([]) -> false;
+is_timeout_test([#msg_status{
+ msg = #basic_message{
+ content = #content{
+ payload_fragments_rev = PFR}}}|Rem]) ->
+ case lists:member(?TIMEOUT_TEST_MSG, PFR) of
+ T = true -> T;
+ _ -> is_timeout_test(Rem)
+ end;
+is_timeout_test([_|Rem]) -> is_timeout_test(Rem).
+
+%%----------------------------------------------------------------------------
+%% Upgrading
+%%----------------------------------------------------------------------------
+
+multiple_routing_keys() ->
+ transform_storage(
+ fun ({basic_message, ExchangeName, Routing_Key, Content,
+ MsgId, Persistent}) ->
+ {ok, {basic_message, ExchangeName, [Routing_Key], Content,
+ MsgId, Persistent}};
+ (_) -> {error, corrupt_message}
+ end),
+ ok.
+
+
+%% Assumes message store is not running
+transform_storage(TransformFun) ->
+ transform_store(?PERSISTENT_MSG_STORE, TransformFun),
+ transform_store(?TRANSIENT_MSG_STORE, TransformFun).
+
+transform_store(Store, TransformFun) ->
+ rabbit_msg_store:force_recovery(rabbit_mnesia:dir(), Store),
+ rabbit_msg_store:transform_dir(rabbit_mnesia:dir(), Store, TransformFun).
diff --git a/test/cluster_rename_SUITE.erl b/test/cluster_rename_SUITE.erl
new file mode 100644
index 0000000000..8ce29a6695
--- /dev/null
+++ b/test/cluster_rename_SUITE.erl
@@ -0,0 +1,304 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(cluster_rename_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_2},
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ % XXX post_change_nodename,
+ abortive_rename,
+ rename_fail,
+ rename_twice_fail
+ ]},
+ {cluster_size_3, [], [
+ rename_cluster_one_by_one,
+ rename_cluster_big_bang,
+ partial_one_by_one,
+ partial_big_bang
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2} %% Replaced with a list of node names later.
+ ]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3} %% Replaced with a list of node names later.
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Nodenames = [
+ list_to_atom(rabbit_misc:format("~s-~b", [Testcase, I]))
+ || I <- lists:seq(1, ClusterSize)
+ ],
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, Nodenames},
+ {rmq_nodes_clustered, true}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = case rabbit_ct_helpers:get_config(Config, save_config) of
+ undefined -> Config;
+ C -> C
+ end,
+ Config2 = rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config2, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+%% Rolling rename of a cluster, each node should do a secondary rename.
+rename_cluster_one_by_one(Config) ->
+ [Node1, Node2, Node3] = rabbit_ct_broker_helpers:get_node_configs(
+ Config, nodename),
+ publish_all(Config,
+ [{Node1, <<"1">>}, {Node2, <<"2">>}, {Node3, <<"3">>}]),
+
+ Config1 = stop_rename_start(Config, Node1, [Node1, jessica]),
+ Config2 = stop_rename_start(Config1, Node2, [Node2, hazel]),
+ Config3 = stop_rename_start(Config2, Node3, [Node3, flopsy]),
+
+ [Jessica, Hazel, Flopsy] = rabbit_ct_broker_helpers:get_node_configs(
+ Config3, nodename),
+ consume_all(Config3,
+ [{Jessica, <<"1">>}, {Hazel, <<"2">>}, {Flopsy, <<"3">>}]),
+ {save_config, Config3}.
+
+%% Big bang rename of a cluster, Node1 should do a primary rename.
+rename_cluster_big_bang(Config) ->
+ [Node1, Node2, Node3] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ publish_all(Config,
+ [{Node1, <<"1">>}, {Node2, <<"2">>}, {Node3, <<"3">>}]),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node3),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node1),
+
+ Map = [Node1, jessica, Node2, hazel, Node3, flopsy],
+ Config1 = rename_node(Config, Node1, Map),
+ Config2 = rename_node(Config1, Node2, Map),
+ Config3 = rename_node(Config2, Node3, Map),
+
+ [Jessica, Hazel, Flopsy] = rabbit_ct_broker_helpers:get_node_configs(
+ Config3, nodename),
+ ok = rabbit_ct_broker_helpers:start_node(Config3, Jessica),
+ ok = rabbit_ct_broker_helpers:start_node(Config3, Hazel),
+ ok = rabbit_ct_broker_helpers:start_node(Config3, Flopsy),
+
+ consume_all(Config3,
+ [{Jessica, <<"1">>}, {Hazel, <<"2">>}, {Flopsy, <<"3">>}]),
+ {save_config, Config3}.
+
+%% Here we test that Node1 copes with things being renamed around it.
+partial_one_by_one(Config) ->
+ [Node1, Node2, Node3] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ publish_all(Config,
+ [{Node1, <<"1">>}, {Node2, <<"2">>}, {Node3, <<"3">>}]),
+
+ Config1 = stop_rename_start(Config, Node1, [Node1, jessica]),
+ Config2 = stop_rename_start(Config1, Node2, [Node2, hazel]),
+
+ [Jessica, Hazel, Node3] = rabbit_ct_broker_helpers:get_node_configs(
+ Config2, nodename),
+ consume_all(Config2,
+ [{Jessica, <<"1">>}, {Hazel, <<"2">>}, {Node3, <<"3">>}]),
+ {save_config, Config2}.
+
+%% Here we test that Node1 copes with things being renamed around it.
+partial_big_bang(Config) ->
+ [Node1, Node2, Node3] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ publish_all(Config,
+ [{Node1, <<"1">>}, {Node2, <<"2">>}, {Node3, <<"3">>}]),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node3),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node2),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node1),
+
+ Map = [Node2, hazel, Node3, flopsy],
+ Config1 = rename_node(Config, Node2, Map),
+ Config2 = rename_node(Config1, Node3, Map),
+
+ [Node1, Hazel, Flopsy] = rabbit_ct_broker_helpers:get_node_configs(Config2,
+ nodename),
+ ok = rabbit_ct_broker_helpers:start_node(Config2, Node1),
+ ok = rabbit_ct_broker_helpers:start_node(Config2, Hazel),
+ ok = rabbit_ct_broker_helpers:start_node(Config2, Flopsy),
+
+ consume_all(Config2,
+ [{Node1, <<"1">>}, {Hazel, <<"2">>}, {Flopsy, <<"3">>}]),
+ {save_config, Config2}.
+
+% XXX %% We should be able to specify the -n parameter on ctl with either
+% XXX %% the before or after name for the local node (since in real cases
+% XXX %% one might want to invoke the command before or after the hostname
+% XXX %% has changed) - usually we test before so here we test after.
+% XXX post_change_nodename([Node1, _Bigwig]) ->
+% XXX publish(Node1, <<"Node1">>),
+% XXX
+% XXX Bugs1 = rabbit_test_configs:stop_node(Node1),
+% XXX Bugs2 = [{nodename, jessica} | proplists:delete(nodename, Bugs1)],
+% XXX Jessica0 = rename_node(Bugs2, jessica, [Node1, jessica]),
+% XXX Jessica = rabbit_test_configs:start_node(Jessica0),
+% XXX
+% XXX consume(Jessica, <<"Node1">>),
+% XXX stop_all([Jessica]),
+% XXX ok.
+
+%% If we invoke rename but the node name does not actually change, we
+%% should roll back.
+abortive_rename(Config) ->
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ publish(Config, Node1, <<"Node1">>),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node1),
+ _Config1 = rename_node(Config, Node1, [Node1, jessica]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Node1),
+
+ consume(Config, Node1, <<"Node1">>),
+ ok.
+
+%% And test some ways the command can fail.
+rename_fail(Config) ->
+ [Node1, Node2] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node1),
+ %% Rename from a node that does not exist
+ ok = rename_node_fail(Config, Node1, [bugzilla, jessica]),
+ %% Rename to a node which does
+ ok = rename_node_fail(Config, Node1, [Node1, Node2]),
+ %% Rename two nodes to the same thing
+ ok = rename_node_fail(Config, Node1, [Node1, jessica, Node2, jessica]),
+ %% Rename while impersonating a node not in the cluster
+ Config1 = rabbit_ct_broker_helpers:set_node_config(Config, Node1,
+ {nodename, 'rabbit@localhost'}),
+ ok = rename_node_fail(Config1, Node1, [Node1, jessica]),
+ ok.
+
+rename_twice_fail(Config) ->
+ Node1 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Node1),
+ Config1 = rename_node(Config, Node1, [Node1, indecisive]),
+ ok = rename_node_fail(Config, Node1, [indecisive, jessica]),
+ {save_config, Config1}.
+
+%% ----------------------------------------------------------------------------
+
+stop_rename_start(Config, Nodename, Map) ->
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Nodename),
+ Config1 = rename_node(Config, Nodename, Map),
+ ok = rabbit_ct_broker_helpers:start_node(Config1, Nodename),
+ Config1.
+
+rename_node(Config, Nodename, Map) ->
+ {ok, Config1} = do_rename_node(Config, Nodename, Map),
+ Config1.
+
+rename_node_fail(Config, Nodename, Map) ->
+ error = do_rename_node(Config, Nodename, Map),
+ ok.
+
+do_rename_node(Config, Nodename, Map) ->
+ Map1 = [
+ begin
+ NStr = atom_to_list(N),
+ case lists:member($@, NStr) of
+ true -> N;
+ false -> rabbit_nodes:make({NStr, "localhost"})
+ end
+ end
+ || N <- Map
+ ],
+ Ret = rabbit_ct_broker_helpers:rabbitmqctl(Config, Nodename,
+ ["rename_cluster_node" | Map1]),
+ case Ret of
+ {ok, _} ->
+ Config1 = update_config_after_rename(Config, Map1),
+ {ok, Config1};
+ {error, _, _} ->
+ error
+ end.
+
+update_config_after_rename(Config, [Old, New | Rest]) ->
+ Config1 = rabbit_ct_broker_helpers:set_node_config(Config, Old,
+ {nodename, New}),
+ update_config_after_rename(Config1, Rest);
+update_config_after_rename(Config, []) ->
+ Config.
+
+publish(Config, Node, Q) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}),
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = Q}),
+ amqp_channel:wait_for_confirms(Ch),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, Node).
+
+consume(Config, Node, Q) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, Node),
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q, durable = true}),
+ {#'basic.get_ok'{}, #amqp_msg{payload = Q}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = Q}),
+ rabbit_ct_client_helpers:close_channels_and_connection(Config, Node).
+
+
+publish_all(Config, Nodes) ->
+ [publish(Config, Node, Key) || {Node, Key} <- Nodes].
+
+consume_all(Config, Nodes) ->
+ [consume(Config, Node, Key) || {Node, Key} <- Nodes].
+
+set_node(Nodename, Cfg) ->
+ [{nodename, Nodename} | proplists:delete(nodename, Cfg)].
diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl
new file mode 100644
index 0000000000..00ddfa48a2
--- /dev/null
+++ b/test/clustering_management_SUITE.erl
@@ -0,0 +1,728 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(clustering_management_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(LOOP_RECURSION_DELAY, 100).
+
+all() ->
+ [
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {unclustered, [], [
+ {cluster_size_2, [], [
+ erlang_config
+ ]},
+ {cluster_size_3, [], [
+ join_and_part_cluster,
+ join_cluster_bad_operations,
+ join_to_start_interval,
+ forget_cluster_node,
+ change_cluster_node_type,
+ change_cluster_when_node_offline,
+ update_cluster_nodes,
+ force_reset_node
+ ]}
+ ]},
+ {clustered, [], [
+ {cluster_size_2, [], [
+ forget_removes_things,
+ reset_removes_things,
+ forget_offline_removes_things,
+ force_boot,
+ status_with_alarm
+ ]},
+ {cluster_size_4, [], [
+ forget_promotes_offline_slave
+ ]}
+ ]}
+ ].
+
+suite() ->
+ [
+ %% If a test hangs, no need to wait for 30 minutes.
+ {timetrap, {minutes, 5}}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]);
+init_per_group(cluster_size_4, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 4}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+join_and_part_cluster(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+ assert_not_clustered(Rabbit),
+ assert_not_clustered(Hare),
+ assert_not_clustered(Bunny),
+
+ stop_join_start(Rabbit, Bunny),
+ assert_clustered([Rabbit, Bunny]),
+
+ stop_join_start(Hare, Bunny, true),
+ assert_cluster_status(
+ {[Bunny, Hare, Rabbit], [Bunny, Rabbit], [Bunny, Hare, Rabbit]},
+ [Rabbit, Hare, Bunny]),
+
+ %% Allow clustering with already clustered node
+ ok = stop_app(Rabbit),
+ {ok, already_member} = join_cluster(Rabbit, Hare),
+ ok = start_app(Rabbit),
+
+ stop_reset_start(Rabbit),
+ assert_not_clustered(Rabbit),
+ assert_cluster_status({[Bunny, Hare], [Bunny], [Bunny, Hare]},
+ [Hare, Bunny]),
+
+ stop_reset_start(Hare),
+ assert_not_clustered(Hare),
+ assert_not_clustered(Bunny).
+
+join_cluster_bad_operations(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+
+ %% Non-existant node
+ ok = stop_app(Rabbit),
+ assert_failure(fun () -> join_cluster(Rabbit, non@existant) end),
+ ok = start_app(Rabbit),
+ assert_not_clustered(Rabbit),
+
+ %% Trying to cluster with mnesia running
+ assert_failure(fun () -> join_cluster(Rabbit, Bunny) end),
+ assert_not_clustered(Rabbit),
+
+ %% Trying to cluster the node with itself
+ ok = stop_app(Rabbit),
+ assert_failure(fun () -> join_cluster(Rabbit, Rabbit) end),
+ ok = start_app(Rabbit),
+ assert_not_clustered(Rabbit),
+
+ %% Do not let the node leave the cluster or reset if it's the only
+ %% ram node
+ stop_join_start(Hare, Rabbit, true),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit], [Rabbit, Hare]},
+ [Rabbit, Hare]),
+ ok = stop_app(Hare),
+ assert_failure(fun () -> join_cluster(Rabbit, Bunny) end),
+ assert_failure(fun () -> reset(Rabbit) end),
+ ok = start_app(Hare),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit], [Rabbit, Hare]},
+ [Rabbit, Hare]),
+
+ %% Cannot start RAM-only node first
+ ok = stop_app(Rabbit),
+ ok = stop_app(Hare),
+ assert_failure(fun () -> start_app(Hare) end),
+ ok = start_app(Rabbit),
+ ok = start_app(Hare),
+ ok.
+
+%% This tests that the nodes in the cluster are notified immediately of a node
+%% join, and not just after the app is started.
+join_to_start_interval(Config) ->
+ [Rabbit, Hare, _Bunny] = cluster_members(Config),
+
+ ok = stop_app(Rabbit),
+ ok = join_cluster(Rabbit, Hare),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
+ [Rabbit, Hare]),
+ ok = start_app(Rabbit),
+ assert_clustered([Rabbit, Hare]).
+
+forget_cluster_node(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+
+ %% Trying to remove a node not in the cluster should fail
+ assert_failure(fun () -> forget_cluster_node(Hare, Rabbit) end),
+
+ stop_join_start(Rabbit, Hare),
+ assert_clustered([Rabbit, Hare]),
+
+ %% Trying to remove an online node should fail
+ assert_failure(fun () -> forget_cluster_node(Hare, Rabbit) end),
+
+ ok = stop_app(Rabbit),
+ %% We're passing the --offline flag, but Hare is online
+ assert_failure(fun () -> forget_cluster_node(Hare, Rabbit, true) end),
+ %% Removing some non-existant node will fail
+ assert_failure(fun () -> forget_cluster_node(Hare, non@existant) end),
+ ok = forget_cluster_node(Hare, Rabbit),
+ assert_not_clustered(Hare),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
+ [Rabbit]),
+
+ %% Now we can't start Rabbit since it thinks that it's still in the cluster
+ %% with Hare, while Hare disagrees.
+ assert_failure(fun () -> start_app(Rabbit) end),
+
+ ok = reset(Rabbit),
+ ok = start_app(Rabbit),
+ assert_not_clustered(Rabbit),
+
+ %% Now we remove Rabbit from an offline node.
+ stop_join_start(Bunny, Hare),
+ stop_join_start(Rabbit, Hare),
+ assert_clustered([Rabbit, Hare, Bunny]),
+ ok = stop_app(Hare),
+ ok = stop_app(Rabbit),
+ ok = stop_app(Bunny),
+ %% This is fine but we need the flag
+ assert_failure(fun () -> forget_cluster_node(Hare, Bunny) end),
+ %% Also fails because hare node is still running
+ assert_failure(fun () -> forget_cluster_node(Hare, Bunny, true) end),
+ %% But this works
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Hare,
+ ["forget_cluster_node", "--offline", Bunny]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Hare),
+ ok = start_app(Rabbit),
+ %% Bunny still thinks its clustered with Rabbit and Hare
+ assert_failure(fun () -> start_app(Bunny) end),
+ ok = reset(Bunny),
+ ok = start_app(Bunny),
+ assert_not_clustered(Bunny),
+ assert_clustered([Rabbit, Hare]).
+
+forget_removes_things(Config) ->
+ test_removes_things(Config, fun (R, H) -> ok = forget_cluster_node(H, R) end).
+
+reset_removes_things(Config) ->
+ test_removes_things(Config, fun (R, _H) -> ok = reset(R) end).
+
+test_removes_things(Config, LoseRabbit) ->
+ Unmirrored = <<"unmirrored-queue">>,
+ [Rabbit, Hare] = cluster_members(Config),
+ RCh = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ declare(RCh, Unmirrored),
+ ok = stop_app(Rabbit),
+
+ HCh = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ {'EXIT',{{shutdown,{server_initiated_close,404,_}}, _}} =
+ (catch declare(HCh, Unmirrored)),
+
+ ok = LoseRabbit(Rabbit, Hare),
+ HCh2 = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ declare(HCh2, Unmirrored),
+ ok.
+
+forget_offline_removes_things(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ Unmirrored = <<"unmirrored-queue">>,
+ X = <<"X">>,
+ RCh = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ declare(RCh, Unmirrored),
+
+ amqp_channel:call(RCh, #'exchange.declare'{durable = true,
+ exchange = X,
+ auto_delete = true}),
+ amqp_channel:call(RCh, #'queue.bind'{queue = Unmirrored,
+ exchange = X}),
+ ok = rabbit_ct_broker_helpers:stop_broker(Config, Rabbit),
+
+ HCh = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ {'EXIT',{{shutdown,{server_initiated_close,404,_}}, _}} =
+ (catch declare(HCh, Unmirrored)),
+
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Rabbit),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Hare,
+ ["forget_cluster_node", "--offline", Rabbit]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Hare),
+
+ HCh2 = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ declare(HCh2, Unmirrored),
+ {'EXIT',{{shutdown,{server_initiated_close,404,_}}, _}} =
+ (catch amqp_channel:call(HCh2,#'exchange.declare'{durable = true,
+ exchange = X,
+ auto_delete = true,
+ passive = true})),
+ ok.
+
+forget_promotes_offline_slave(Config) ->
+ [A, B, C, D] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"mirrored-queue">>,
+ declare(ACh, Q),
+ set_ha_policy(Config, Q, A, [B, C]),
+ set_ha_policy(Config, Q, A, [C, D]), %% Test add and remove from recoverable_slaves
+
+ %% Publish and confirm
+ amqp_channel:call(ACh, #'confirm.select'{}),
+ amqp_channel:cast(ACh, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
+ amqp_channel:wait_for_confirms(ACh),
+
+ %% We kill nodes rather than stop them in order to make sure
+ %% that we aren't dependent on anything that happens as they shut
+ %% down (see bug 26467).
+ ok = rabbit_ct_broker_helpers:kill_node(Config, D),
+ ok = rabbit_ct_broker_helpers:kill_node(Config, C),
+ ok = rabbit_ct_broker_helpers:kill_node(Config, B),
+ ok = rabbit_ct_broker_helpers:kill_node(Config, A),
+
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, C,
+ ["force_boot"]),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, C),
+
+ %% We should now have the following dramatis personae:
+ %% A - down, master
+ %% B - down, used to be slave, no longer is, never had the message
+ %% C - running, should be slave, but has wiped the message on restart
+ %% D - down, recoverable slave, contains message
+ %%
+ %% So forgetting A should offline-promote the queue to D, keeping
+ %% the message.
+
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, C,
+ ["forget_cluster_node", A]),
+
+ ok = rabbit_ct_broker_helpers:start_node(Config, D),
+ DCh2 = rabbit_ct_client_helpers:open_channel(Config, D),
+ #'queue.declare_ok'{message_count = 1} = declare(DCh2, Q),
+ ok.
+
+set_ha_policy(Config, Q, Master, Slaves) ->
+ Nodes = [list_to_binary(atom_to_list(N)) || N <- [Master | Slaves]],
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Master, Q,
+ {<<"nodes">>, Nodes}),
+ await_slaves(Q, Master, Slaves).
+
+await_slaves(Q, Master, Slaves) ->
+ {ok, #amqqueue{pid = MPid,
+ slave_pids = SPids}} =
+ rpc:call(Master, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Q)]),
+ ActMaster = node(MPid),
+ ActSlaves = lists:usort([node(P) || P <- SPids]),
+ case {Master, lists:usort(Slaves)} of
+ {ActMaster, ActSlaves} -> ok;
+ _ -> timer:sleep(100),
+ await_slaves(Q, Master, Slaves)
+ end.
+
+force_boot(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ {error, _, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Rabbit,
+ ["force_boot"]),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Rabbit),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, Hare),
+ {error, _} = rabbit_ct_broker_helpers:start_node(Config, Rabbit),
+ {ok, _} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Rabbit,
+ ["force_boot"]),
+ ok = rabbit_ct_broker_helpers:start_node(Config, Rabbit),
+ ok.
+
+change_cluster_node_type(Config) ->
+ [Rabbit, Hare, _Bunny] = cluster_members(Config),
+
+ %% Trying to change the ram node when not clustered should always fail
+ ok = stop_app(Rabbit),
+ assert_failure(fun () -> change_cluster_node_type(Rabbit, ram) end),
+ assert_failure(fun () -> change_cluster_node_type(Rabbit, disc) end),
+ ok = start_app(Rabbit),
+
+ ok = stop_app(Rabbit),
+ join_cluster(Rabbit, Hare),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
+ [Rabbit, Hare]),
+ change_cluster_node_type(Rabbit, ram),
+ assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]},
+ [Rabbit, Hare]),
+ change_cluster_node_type(Rabbit, disc),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
+ [Rabbit, Hare]),
+ change_cluster_node_type(Rabbit, ram),
+ ok = start_app(Rabbit),
+ assert_cluster_status({[Rabbit, Hare], [Hare], [Hare, Rabbit]},
+ [Rabbit, Hare]),
+
+ %% Changing to ram when you're the only ram node should fail
+ ok = stop_app(Hare),
+ assert_failure(fun () -> change_cluster_node_type(Hare, ram) end),
+ ok = start_app(Hare).
+
+change_cluster_when_node_offline(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+
+ %% Cluster the three notes
+ stop_join_start(Rabbit, Hare),
+ assert_clustered([Rabbit, Hare]),
+
+ stop_join_start(Bunny, Hare),
+ assert_clustered([Rabbit, Hare, Bunny]),
+
+ %% Bring down Rabbit, and remove Bunny from the cluster while
+ %% Rabbit is offline
+ ok = stop_app(Rabbit),
+ ok = stop_app(Bunny),
+ ok = reset(Bunny),
+ assert_cluster_status({[Bunny], [Bunny], []}, [Bunny]),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]}, [Hare]),
+ assert_cluster_status(
+ {[Rabbit, Hare, Bunny], [Rabbit, Hare, Bunny], [Hare, Bunny]}, [Rabbit]),
+
+ %% Bring Rabbit back up
+ ok = start_app(Rabbit),
+ assert_clustered([Rabbit, Hare]),
+ ok = start_app(Bunny),
+ assert_not_clustered(Bunny),
+
+ %% Now the same, but Rabbit is a RAM node, and we bring up Bunny
+ %% before
+ ok = stop_app(Rabbit),
+ ok = change_cluster_node_type(Rabbit, ram),
+ ok = start_app(Rabbit),
+ stop_join_start(Bunny, Hare),
+ assert_cluster_status(
+ {[Rabbit, Hare, Bunny], [Hare, Bunny], [Rabbit, Hare, Bunny]},
+ [Rabbit, Hare, Bunny]),
+ ok = stop_app(Rabbit),
+ ok = stop_app(Bunny),
+ ok = reset(Bunny),
+ ok = start_app(Bunny),
+ assert_not_clustered(Bunny),
+ assert_cluster_status({[Rabbit, Hare], [Hare], [Hare]}, [Hare]),
+ assert_cluster_status(
+ {[Rabbit, Hare, Bunny], [Hare, Bunny], [Hare, Bunny]},
+ [Rabbit]),
+ ok = start_app(Rabbit),
+ assert_cluster_status({[Rabbit, Hare], [Hare], [Rabbit, Hare]},
+ [Rabbit, Hare]),
+ assert_not_clustered(Bunny).
+
+update_cluster_nodes(Config) ->
+ [Rabbit, Hare, Bunny] = cluster_members(Config),
+
+ %% Mnesia is running...
+ assert_failure(fun () -> update_cluster_nodes(Rabbit, Hare) end),
+
+ ok = stop_app(Rabbit),
+ ok = join_cluster(Rabbit, Hare),
+ ok = stop_app(Bunny),
+ ok = join_cluster(Bunny, Hare),
+ ok = start_app(Bunny),
+ stop_reset_start(Hare),
+ assert_failure(fun () -> start_app(Rabbit) end),
+ %% Bogus node
+ assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existant) end),
+ %% Inconsisent node
+ assert_failure(fun () -> update_cluster_nodes(Rabbit, Hare) end),
+ ok = update_cluster_nodes(Rabbit, Bunny),
+ ok = start_app(Rabbit),
+ assert_not_clustered(Hare),
+ assert_clustered([Rabbit, Bunny]).
+
+erlang_config(Config) ->
+ [Rabbit, Hare] = cluster_members(Config),
+
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {[Rabbit], disc}]),
+ ok = start_app(Hare),
+ assert_clustered([Rabbit, Hare]),
+
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {[Rabbit], ram}]),
+ ok = start_app(Hare),
+ assert_cluster_status({[Rabbit, Hare], [Rabbit], [Rabbit, Hare]},
+ [Rabbit, Hare]),
+
+ %% Check having a stop_app'ed node around doesn't break completely.
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = stop_app(Rabbit),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {[Rabbit], disc}]),
+ ok = start_app(Hare),
+ ok = start_app(Rabbit),
+ assert_not_clustered(Hare),
+ assert_not_clustered(Rabbit),
+
+ %% We get a warning but we start anyway
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {[non@existent], disc}]),
+ ok = start_app(Hare),
+ assert_not_clustered(Hare),
+ assert_not_clustered(Rabbit),
+
+ %% If we use a legacy config file, the node fails to start.
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, [Rabbit]]),
+ assert_failure(fun () -> start_app(Hare) end),
+ assert_not_clustered(Rabbit),
+
+ %% If we use an invalid node name, the node fails to start.
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {["Mike's computer"], disc}]),
+ assert_failure(fun () -> start_app(Hare) end),
+ assert_not_clustered(Rabbit),
+
+ %% If we use an invalid node type, the node fails to start.
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, {[Rabbit], blue}]),
+ assert_failure(fun () -> start_app(Hare) end),
+ assert_not_clustered(Rabbit),
+
+ %% If we use an invalid cluster_nodes conf, the node fails to start.
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, true]),
+ assert_failure(fun () -> start_app(Hare) end),
+ assert_not_clustered(Rabbit),
+
+ ok = stop_app(Hare),
+ ok = reset(Hare),
+ ok = rpc:call(Hare, application, set_env,
+ [rabbit, cluster_nodes, "Yes, please"]),
+ assert_failure(fun () -> start_app(Hare) end),
+ assert_not_clustered(Rabbit).
+
+force_reset_node(Config) ->
+ [Rabbit, Hare, _Bunny] = cluster_members(Config),
+
+ stop_join_start(Rabbit, Hare),
+ stop_app(Rabbit),
+ force_reset(Rabbit),
+ %% Hare thinks that Rabbit is still clustered
+ assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]},
+ [Hare]),
+ %% %% ...but it isn't
+ assert_cluster_status({[Rabbit], [Rabbit], []}, [Rabbit]),
+ %% We can rejoin Rabbit and Hare
+ update_cluster_nodes(Rabbit, Hare),
+ start_app(Rabbit),
+ assert_clustered([Rabbit, Hare]).
+
+status_with_alarm(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+
+ %% Given: an alarm is raised each node.
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, Rabbit,
+ ["set_vm_memory_high_watermark", "0.000000001"]),
+ rabbit_ct_broker_helpers:rabbitmqctl(Config, Hare,
+ ["set_disk_free_limit", "2048G"]),
+
+ %% When: we ask for cluster status.
+ {ok, S} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Rabbit,
+ ["cluster_status"]),
+ {ok, R} = rabbit_ct_broker_helpers:rabbitmqctl(Config, Hare,
+ ["cluster_status"]),
+
+ %% Then: both nodes have printed alarm information for eachother.
+ ok = alarm_information_on_each_node(S, Rabbit, Hare),
+ ok = alarm_information_on_each_node(R, Rabbit, Hare).
+
+
+%% ----------------------------------------------------------------------------
+%% Internal utils
+
+cluster_members(Config) ->
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename).
+
+assert_cluster_status(Status0, Nodes) ->
+ Status = {AllNodes, _, _} = sort_cluster_status(Status0),
+ wait_for_cluster_status(Status, AllNodes, Nodes).
+
+wait_for_cluster_status(Status, AllNodes, Nodes) ->
+ Max = 10000 / ?LOOP_RECURSION_DELAY,
+ wait_for_cluster_status(0, Max, Status, AllNodes, Nodes).
+
+wait_for_cluster_status(N, Max, Status, _AllNodes, Nodes) when N >= Max ->
+ erlang:error({cluster_status_max_tries_failed,
+ [{nodes, Nodes},
+ {expected_status, Status},
+ {max_tried, Max}]});
+wait_for_cluster_status(N, Max, Status, AllNodes, Nodes) ->
+ case lists:all(fun (Node) ->
+ verify_status_equal(Node, Status, AllNodes)
+ end, Nodes) of
+ true -> ok;
+ false -> timer:sleep(?LOOP_RECURSION_DELAY),
+ wait_for_cluster_status(N + 1, Max, Status, AllNodes, Nodes)
+ end.
+
+verify_status_equal(Node, Status, AllNodes) ->
+ NodeStatus = sort_cluster_status(cluster_status(Node)),
+ (AllNodes =/= [Node]) =:= rpc:call(Node, rabbit_mnesia, is_clustered, [])
+ andalso NodeStatus =:= Status.
+
+cluster_status(Node) ->
+ {rpc:call(Node, rabbit_mnesia, cluster_nodes, [all]),
+ rpc:call(Node, rabbit_mnesia, cluster_nodes, [disc]),
+ rpc:call(Node, rabbit_mnesia, cluster_nodes, [running])}.
+
+sort_cluster_status({All, Disc, Running}) ->
+ {lists:sort(All), lists:sort(Disc), lists:sort(Running)}.
+
+assert_clustered(Nodes) ->
+ assert_cluster_status({Nodes, Nodes, Nodes}, Nodes).
+
+assert_not_clustered(Node) ->
+ assert_cluster_status({[Node], [Node], [Node]}, [Node]).
+
+assert_failure(Fun) ->
+ case catch Fun() of
+ {error, Reason} -> Reason;
+ {error_string, Reason} -> Reason;
+ {badrpc, {'EXIT', Reason}} -> Reason;
+ {badrpc_multi, Reason, _Nodes} -> Reason;
+ Other -> exit({expected_failure, Other})
+ end.
+
+stop_app(Node) ->
+ control_action(stop_app, Node).
+
+start_app(Node) ->
+ control_action(start_app, Node).
+
+join_cluster(Node, To) ->
+ join_cluster(Node, To, false).
+
+join_cluster(Node, To, Ram) ->
+ control_action(join_cluster, Node, [atom_to_list(To)], [{"--ram", Ram}]).
+
+reset(Node) ->
+ control_action(reset, Node).
+
+force_reset(Node) ->
+ control_action(force_reset, Node).
+
+forget_cluster_node(Node, Removee, RemoveWhenOffline) ->
+ control_action(forget_cluster_node, Node, [atom_to_list(Removee)],
+ [{"--offline", RemoveWhenOffline}]).
+
+forget_cluster_node(Node, Removee) ->
+ forget_cluster_node(Node, Removee, false).
+
+change_cluster_node_type(Node, Type) ->
+ control_action(change_cluster_node_type, Node, [atom_to_list(Type)]).
+
+update_cluster_nodes(Node, DiscoveryNode) ->
+ control_action(update_cluster_nodes, Node, [atom_to_list(DiscoveryNode)]).
+
+stop_join_start(Node, ClusterTo, Ram) ->
+ ok = stop_app(Node),
+ ok = join_cluster(Node, ClusterTo, Ram),
+ ok = start_app(Node).
+
+stop_join_start(Node, ClusterTo) ->
+ stop_join_start(Node, ClusterTo, false).
+
+stop_reset_start(Node) ->
+ ok = stop_app(Node),
+ ok = reset(Node),
+ ok = start_app(Node).
+
+control_action(Command, Node) ->
+ control_action(Command, Node, [], []).
+
+control_action(Command, Node, Args) ->
+ control_action(Command, Node, Args, []).
+
+control_action(Command, Node, Args, Opts) ->
+ rpc:call(Node, rabbit_control_main, action,
+ [Command, Node, Args, Opts,
+ fun io:format/2]).
+
+declare(Ch, Name) ->
+ Res = amqp_channel:call(Ch, #'queue.declare'{durable = true,
+ queue = Name}),
+ amqp_channel:call(Ch, #'queue.bind'{queue = Name,
+ exchange = <<"amq.fanout">>}),
+ Res.
+
+alarm_information_on_each_node(Output, Rabbit, Hare) ->
+
+ A = string:str(Output, "alarms"), true = A > 0,
+
+ %% Test that names are printed after `alarms': this counts on
+ %% output with a `{Name, Value}' kind of format, for listing
+ %% alarms, so that we can miss any node names in preamble text.
+ Alarms = string:substr(Output, A),
+ RabbitStr = atom_to_list(Rabbit),
+ HareStr = atom_to_list(Hare),
+ match = re:run(Alarms, "\\{'?" ++ RabbitStr ++ "'?,\\[memory\\]\\}",
+ [{capture, none}]),
+ match = re:run(Alarms, "\\{'?" ++ HareStr ++ "'?,\\[disk\\]\\}",
+ [{capture, none}]),
+
+ ok.
diff --git a/test/crashing_queues_SUITE.erl b/test/crashing_queues_SUITE.erl
new file mode 100644
index 0000000000..872b771811
--- /dev/null
+++ b/test/crashing_queues_SUITE.erl
@@ -0,0 +1,269 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(crashing_queues_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_2}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ crashing_unmirrored,
+ crashing_mirrored,
+ give_up_after_repeated_crashes
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+crashing_unmirrored(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, A),
+ ConnB = rabbit_ct_client_helpers:open_connection(Config, B),
+ QName = <<"crashing_unmirrored-q">>,
+ amqp_channel:call(ChA, #'confirm.select'{}),
+ test_queue_failure(A, ChA, ConnB, 1, 0,
+ #'queue.declare'{queue = QName, durable = true}),
+ test_queue_failure(A, ChA, ConnB, 0, 0,
+ #'queue.declare'{queue = QName, durable = false}),
+ ok.
+
+crashing_mirrored(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<".*">>, <<"all">>),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, A),
+ ConnB = rabbit_ct_client_helpers:open_connection(Config, B),
+ QName = <<"crashing_mirrored-q">>,
+ amqp_channel:call(ChA, #'confirm.select'{}),
+ test_queue_failure(A, ChA, ConnB, 2, 1,
+ #'queue.declare'{queue = QName, durable = true}),
+ ok.
+
+test_queue_failure(Node, Ch, RaceConn, MsgCount, SlaveCount, Decl) ->
+ #'queue.declare_ok'{queue = QName} = amqp_channel:call(Ch, Decl),
+ try
+ publish(Ch, QName, transient),
+ publish(Ch, QName, durable),
+ Racer = spawn_declare_racer(RaceConn, Decl),
+ kill_queue(Node, QName),
+ assert_message_count(MsgCount, Ch, QName),
+ assert_slave_count(SlaveCount, Node, QName),
+ stop_declare_racer(Racer)
+ after
+ amqp_channel:call(Ch, #'queue.delete'{queue = QName})
+ end.
+
+give_up_after_repeated_crashes(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, A),
+ ChB = rabbit_ct_client_helpers:open_channel(Config, B),
+ QName = <<"give_up_after_repeated_crashes-q">>,
+ amqp_channel:call(ChA, #'confirm.select'{}),
+ amqp_channel:call(ChA, #'queue.declare'{queue = QName,
+ durable = true}),
+ await_state(A, QName, running),
+ publish(ChA, QName, durable),
+ kill_queue_hard(A, QName),
+ {'EXIT', _} = (catch amqp_channel:call(
+ ChA, #'queue.declare'{queue = QName,
+ durable = true})),
+ await_state(A, QName, crashed),
+ amqp_channel:call(ChB, #'queue.delete'{queue = QName}),
+ amqp_channel:call(ChB, #'queue.declare'{queue = QName,
+ durable = true}),
+ await_state(A, QName, running),
+
+ %% Since it's convenient, also test absent queue status here.
+ rabbit_ct_broker_helpers:stop_node(Config, B),
+ await_state(A, QName, down),
+ ok.
+
+
+publish(Ch, QName, DelMode) ->
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = QName},
+ Msg = #amqp_msg{props = #'P_basic'{delivery_mode = del_mode(DelMode)}},
+ amqp_channel:cast(Ch, Publish, Msg),
+ amqp_channel:wait_for_confirms(Ch).
+
+del_mode(transient) -> 1;
+del_mode(durable) -> 2.
+
+spawn_declare_racer(Conn, Decl) ->
+ Self = self(),
+ spawn_link(fun() -> declare_racer_loop(Self, Conn, Decl) end).
+
+stop_declare_racer(Pid) ->
+ Pid ! stop,
+ MRef = erlang:monitor(process, Pid),
+ receive
+ {'DOWN', MRef, process, Pid, _} -> ok
+ end.
+
+declare_racer_loop(Parent, Conn, Decl) ->
+ receive
+ stop -> unlink(Parent)
+ after 0 ->
+ %% Catch here because we might happen to catch the queue
+ %% while it is in the middle of recovering and thus
+ %% explode with NOT_FOUND because crashed. Doesn't matter,
+ %% we are only in this loop to try to fool the recovery
+ %% code anyway.
+ try
+ case amqp_connection:open_channel(Conn) of
+ {ok, Ch} -> amqp_channel:call(Ch, Decl);
+ closing -> ok
+ end
+ catch
+ exit:_ ->
+ ok
+ end,
+ declare_racer_loop(Parent, Conn, Decl)
+ end.
+
+await_state(Node, QName, State) ->
+ await_state(Node, QName, State, 30000).
+
+await_state(Node, QName, State, Time) ->
+ case state(Node, QName) of
+ State ->
+ ok;
+ Other ->
+ case Time of
+ 0 -> exit({timeout_awaiting_state, State, Other});
+ _ -> timer:sleep(100),
+ await_state(Node, QName, State, Time - 100)
+ end
+ end.
+
+state(Node, QName) ->
+ V = <<"/">>,
+ Res = rabbit_misc:r(V, queue, QName),
+ Infos = rpc:call(Node, rabbit_amqqueue, info_all, [V, [name, state]]),
+ case Infos of
+ [] -> undefined;
+ [[{name, Res}, {state, State}]] -> State
+ end.
+
+kill_queue_hard(Node, QName) ->
+ case kill_queue(Node, QName) of
+ crashed -> ok;
+ _NewPid -> timer:sleep(100),
+ kill_queue_hard(Node, QName)
+ end.
+
+kill_queue(Node, QName) ->
+ Pid1 = queue_pid(Node, QName),
+ exit(Pid1, boom),
+ await_new_pid(Node, QName, Pid1).
+
+queue_pid(Node, QName) ->
+ #amqqueue{pid = QPid,
+ state = State} = lookup(Node, QName),
+ case State of
+ crashed -> case sup_child(Node, rabbit_amqqueue_sup_sup) of
+ {ok, _} -> QPid; %% restarting
+ {error, no_child} -> crashed %% given up
+ end;
+ _ -> QPid
+ end.
+
+sup_child(Node, Sup) ->
+ case rpc:call(Node, supervisor2, which_children, [Sup]) of
+ [{_, Child, _, _}] -> {ok, Child};
+ [] -> {error, no_child};
+ {badrpc, {'EXIT', {noproc, _}}} -> {error, no_sup}
+ end.
+
+lookup(Node, QName) ->
+ {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, QName)]),
+ Q.
+
+await_new_pid(Node, QName, OldPid) ->
+ case queue_pid(Node, QName) of
+ OldPid -> timer:sleep(10),
+ await_new_pid(Node, QName, OldPid);
+ New -> New
+ end.
+
+assert_message_count(Count, Ch, QName) ->
+ #'queue.declare_ok'{message_count = Count} =
+ amqp_channel:call(Ch, #'queue.declare'{queue = QName,
+ passive = true}).
+
+assert_slave_count(Count, Node, QName) ->
+ Q = lookup(Node, QName),
+ [{_, Pids}] = rpc:call(Node, rabbit_amqqueue, info, [Q, [slave_pids]]),
+ RealCount = case Pids of
+ '' -> 0;
+ _ -> length(Pids)
+ end,
+ case RealCount of
+ Count ->
+ ok;
+ _ when RealCount < Count ->
+ timer:sleep(10),
+ assert_slave_count(Count, Node, QName);
+ _ ->
+ exit({too_many_slaves, Count, RealCount})
+ end.
diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl
new file mode 100644
index 0000000000..5872d97d4c
--- /dev/null
+++ b/test/dynamic_ha_SUITE.erl
@@ -0,0 +1,329 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(dynamic_ha_SUITE).
+
+%% rabbit_tests:test_dynamic_mirroring() is a unit test which should
+%% test the logic of what all the policies decide to do, so we don't
+%% need to exhaustively test that here. What we need to test is that:
+%%
+%% * Going from non-mirrored to mirrored works and vice versa
+%% * Changing policy can add / remove mirrors and change the master
+%% * Adding a node will create a new mirror when there are not enough nodes
+%% for the policy
+%% * Removing a node will not create a new mirror even if the policy
+%% logic wants it (since this gives us a good way to lose messages
+%% on cluster shutdown, by repeated failover to new nodes)
+%%
+%% The first two are change_policy, the last two are change_cluster
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("eunit/include/eunit.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(QNAME, <<"ha.test">>).
+-define(POLICY, <<"^ha.test$">>). %% " emacs
+-define(VHOST, <<"/">>).
+
+all() ->
+ [
+ {group, unclustered},
+ {group, clustered}
+ ].
+
+groups() ->
+ [
+ {unclustered, [], [
+ {cluster_size_5, [], [
+ change_cluster
+ ]}
+ ]},
+ {clustered, [], [
+ {cluster_size_2, [], [
+ vhost_deletion,
+ promote_on_shutdown
+ ]},
+ {cluster_size_3, [], [
+ change_policy,
+ rapid_change
+ ]}
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(unclustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]);
+init_per_group(clustered, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]);
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]);
+init_per_group(cluster_size_5, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 5}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+change_policy(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ %% When we first declare a queue with no policy, it's not HA.
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME}),
+ assert_slaves(A, ?QNAME, {A, ''}),
+
+ %% Give it policy "all", it becomes HA and gets all mirrors
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY, <<"all">>),
+ assert_slaves(A, ?QNAME, {A, [B, C]}),
+
+ %% Give it policy "nodes", it gets specific mirrors
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY,
+ {<<"nodes">>, [rabbit_misc:atom_to_binary(A),
+ rabbit_misc:atom_to_binary(B)]}),
+ assert_slaves(A, ?QNAME, {A, [B]}),
+
+ %% Now explicitly change the mirrors
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY,
+ {<<"nodes">>, [rabbit_misc:atom_to_binary(A),
+ rabbit_misc:atom_to_binary(C)]}),
+ assert_slaves(A, ?QNAME, {A, [C]}, [{A, [B, C]}]),
+
+ %% Clear the policy, and we go back to non-mirrored
+ rabbit_ct_broker_helpers:clear_policy(Config, A, ?POLICY),
+ assert_slaves(A, ?QNAME, {A, ''}),
+
+ %% Test switching "away" from an unmirrored node
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY,
+ {<<"nodes">>, [rabbit_misc:atom_to_binary(B),
+ rabbit_misc:atom_to_binary(C)]}),
+ assert_slaves(A, ?QNAME, {A, [B, C]}, [{A, [B]}, {A, [C]}]),
+
+ ok.
+
+change_cluster(Config) ->
+ [A, B, C, D, E] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ rabbit_ct_broker_helpers:cluster_nodes(Config, [A, B, C]),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME}),
+ assert_slaves(A, ?QNAME, {A, ''}),
+
+ %% Give it policy exactly 4, it should mirror to all 3 nodes
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, ?POLICY,
+ {<<"exactly">>, 4}),
+ assert_slaves(A, ?QNAME, {A, [B, C]}),
+
+ %% Add D and E, D joins in
+ rabbit_ct_broker_helpers:cluster_nodes(Config, [A, D, E]),
+ assert_slaves(A, ?QNAME, {A, [B, C, D]}),
+
+ %% Remove D, E joins in
+ rabbit_ct_broker_helpers:stop_node(Config, D),
+ assert_slaves(A, ?QNAME, {A, [B, C, E]}),
+
+ ok.
+
+rapid_change(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ {_Pid, MRef} = spawn_monitor(
+ fun() ->
+ [rapid_amqp_ops(ACh, I) || I <- lists:seq(1, 100)]
+ end),
+ rapid_loop(Config, A, MRef),
+ ok.
+
+rapid_amqp_ops(Ch, I) ->
+ Payload = list_to_binary(integer_to_list(I)),
+ amqp_channel:call(Ch, #'queue.declare'{queue = ?QNAME}),
+ amqp_channel:cast(Ch, #'basic.publish'{exchange = <<"">>,
+ routing_key = ?QNAME},
+ #amqp_msg{payload = Payload}),
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = ?QNAME,
+ no_ack = true}, self()),
+ receive #'basic.consume_ok'{} -> ok
+ end,
+ receive {#'basic.deliver'{}, #amqp_msg{payload = Payload}} ->
+ ok
+ end,
+ amqp_channel:call(Ch, #'queue.delete'{queue = ?QNAME}).
+
+rapid_loop(Config, Node, MRef) ->
+ receive
+ {'DOWN', MRef, process, _Pid, normal} ->
+ ok;
+ {'DOWN', MRef, process, _Pid, Reason} ->
+ exit({amqp_ops_died, Reason})
+ after 0 ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Node, ?POLICY,
+ <<"all">>),
+ rabbit_ct_broker_helpers:clear_policy(Config, Node, ?POLICY),
+ rapid_loop(Config, Node, MRef)
+ end.
+
+%% Vhost deletion needs to successfully tear down policies and queues
+%% with policies. At least smoke-test that it doesn't blow up.
+vhost_deletion(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy_all(Config),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ amqp_channel:call(ACh, #'queue.declare'{queue = <<"vhost_deletion-q">>}),
+ ok = rpc:call(A, rabbit_vhost, delete, [<<"/">>]),
+ ok.
+
+promote_on_shutdown(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.promote">>,
+ <<"all">>, [{<<"ha-promote-on-shutdown">>, <<"always">>}]),
+ rabbit_ct_broker_helpers:set_ha_policy(Config, A, <<"^ha.nopromote">>,
+ <<"all">>),
+
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ [begin
+ amqp_channel:call(ACh, #'queue.declare'{queue = Q,
+ durable = true}),
+ rabbit_ct_client_helpers:publish(ACh, Q, 10)
+ end || Q <- [<<"ha.promote.test">>, <<"ha.nopromote.test">>]],
+ ok = rabbit_ct_broker_helpers:restart_node(Config, B),
+ ok = rabbit_ct_broker_helpers:stop_node(Config, A),
+ BCh = rabbit_ct_client_helpers:open_channel(Config, B),
+ #'queue.declare_ok'{message_count = 0} =
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.promote.test">>,
+ durable = true}),
+ ?assertExit(
+ {{shutdown, {server_initiated_close, 404, _}}, _},
+ amqp_channel:call(
+ BCh, #'queue.declare'{queue = <<"ha.nopromote.test">>,
+ durable = true})),
+ ok = rabbit_ct_broker_helpers:start_node(Config, A),
+ ACh2 = rabbit_ct_client_helpers:open_channel(Config, A),
+ #'queue.declare_ok'{message_count = 10} =
+ amqp_channel:call(
+ ACh2, #'queue.declare'{queue = <<"ha.nopromote.test">>,
+ durable = true}),
+ ok.
+
+%%----------------------------------------------------------------------------
+
+assert_slaves(RPCNode, QName, Exp) ->
+ assert_slaves(RPCNode, QName, Exp, []).
+
+assert_slaves(RPCNode, QName, Exp, PermittedIntermediate) ->
+ assert_slaves0(RPCNode, QName, Exp,
+ [{get(previous_exp_m_node), get(previous_exp_s_nodes)} |
+ PermittedIntermediate]).
+
+assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes}, PermittedIntermediate) ->
+ Q = find_queue(QName, RPCNode),
+ Pid = proplists:get_value(pid, Q),
+ SPids = proplists:get_value(slave_pids, Q),
+ ActMNode = node(Pid),
+ ActSNodes = case SPids of
+ '' -> '';
+ _ -> [node(SPid) || SPid <- SPids]
+ end,
+ case ExpMNode =:= ActMNode andalso equal_list(ExpSNodes, ActSNodes) of
+ false ->
+ %% It's an async change, so if nothing has changed let's
+ %% just wait - of course this means if something does not
+ %% change when expected then we time out the test which is
+ %% a bit tedious
+ case [found || {PermMNode, PermSNodes} <- PermittedIntermediate,
+ PermMNode =:= ActMNode,
+ equal_list(PermSNodes, ActSNodes)] of
+ [] -> ct:fail("Expected ~p / ~p, got ~p / ~p~nat ~p~n",
+ [ExpMNode, ExpSNodes, ActMNode, ActSNodes,
+ get_stacktrace()]);
+ _ -> timer:sleep(100),
+ assert_slaves0(RPCNode, QName, {ExpMNode, ExpSNodes},
+ PermittedIntermediate)
+ end;
+ true ->
+ put(previous_exp_m_node, ExpMNode),
+ put(previous_exp_s_nodes, ExpSNodes),
+ ok
+ end.
+
+equal_list('', '') -> true;
+equal_list('', _Act) -> false;
+equal_list(_Exp, '') -> false;
+equal_list([], []) -> true;
+equal_list(_Exp, []) -> false;
+equal_list([], _Act) -> false;
+equal_list([H|T], Act) -> case lists:member(H, Act) of
+ true -> equal_list(T, Act -- [H]);
+ false -> false
+ end.
+
+find_queue(QName, RPCNode) ->
+ Qs = rpc:call(RPCNode, rabbit_amqqueue, info_all, [?VHOST], infinity),
+ case find_queue0(QName, Qs) of
+ did_not_find_queue -> timer:sleep(100),
+ find_queue(QName, RPCNode);
+ Q -> Q
+ end.
+
+find_queue0(QName, Qs) ->
+ case [Q || Q <- Qs, proplists:get_value(name, Q) =:=
+ rabbit_misc:r(?VHOST, queue, QName)] of
+ [R] -> R;
+ [] -> did_not_find_queue
+ end.
+
+get_stacktrace() ->
+ try
+ throw(e)
+ catch
+ _:e ->
+ erlang:get_stacktrace()
+ end.
diff --git a/test/eager_sync_SUITE.erl b/test/eager_sync_SUITE.erl
new file mode 100644
index 0000000000..93b308b6c5
--- /dev/null
+++ b/test/eager_sync_SUITE.erl
@@ -0,0 +1,278 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(eager_sync_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(QNAME, <<"ha.two.test">>).
+-define(QNAME_AUTO, <<"ha.auto.test">>).
+-define(MESSAGE_COUNT, 2000).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ eager_sync,
+ eager_sync_cancel,
+ eager_sync_auto,
+ eager_sync_auto_on_policy_change,
+ eager_sync_requeue
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = 3,
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1,
+ fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+eager_sync(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% Queue is on AB but not C.
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, C),
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME,
+ durable = true}),
+
+ %% Don't sync, lose messages
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ restart(Config, A),
+ restart(Config, B),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, 0),
+
+ %% Sync, keep messages
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ restart(Config, A),
+ ok = sync(C, ?QNAME),
+ restart(Config, B),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT),
+
+ %% Check the no-need-to-sync path
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ ok = sync(C, ?QNAME),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT),
+
+ %% keep unacknowledged messages
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ rabbit_ct_client_helpers:fetch(Ch, ?QNAME, 2),
+ restart(Config, A),
+ rabbit_ct_client_helpers:fetch(Ch, ?QNAME, 3),
+ sync(C, ?QNAME),
+ restart(Config, B),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, ?MESSAGE_COUNT),
+
+ ok.
+
+eager_sync_cancel(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% Queue is on AB but not C.
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, C),
+
+ set_app_sync_batch_size(A),
+ set_app_sync_batch_size(B),
+ set_app_sync_batch_size(C),
+
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME,
+ durable = true}),
+ {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence
+ eager_sync_cancel_test2(Config, A, B, C, Ch).
+
+eager_sync_cancel_test2(Config, A, B, C, Ch) ->
+ %% Sync then cancel
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ restart(Config, A),
+ set_app_sync_batch_size(A),
+ spawn_link(fun() -> ok = sync_nowait(C, ?QNAME) end),
+ case wait_for_syncing(C, ?QNAME, 1) of
+ ok ->
+ case sync_cancel(C, ?QNAME) of
+ ok ->
+ wait_for_running(C, ?QNAME),
+ restart(Config, B),
+ set_app_sync_batch_size(B),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, 0),
+
+ {ok, not_syncing} = sync_cancel(C, ?QNAME), %% Idempotence
+ ok;
+ {ok, not_syncing} ->
+ %% Damn. Syncing finished between wait_for_syncing/3 and
+ %% sync_cancel/2 above. Start again.
+ amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}),
+ eager_sync_cancel_test2(Config, A, B, C, Ch)
+ end;
+ synced_already ->
+ %% Damn. Syncing finished before wait_for_syncing/3. Start again.
+ amqp_channel:call(Ch, #'queue.purge'{queue = ?QNAME}),
+ eager_sync_cancel_test2(Config, A, B, C, Ch)
+ end.
+
+eager_sync_auto(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, C),
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME_AUTO,
+ durable = true}),
+
+ %% Sync automatically, don't lose messages
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME_AUTO, ?MESSAGE_COUNT),
+ restart(Config, A),
+ wait_for_sync(C, ?QNAME_AUTO),
+ restart(Config, B),
+ wait_for_sync(C, ?QNAME_AUTO),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME_AUTO, ?MESSAGE_COUNT),
+
+ ok.
+
+eager_sync_auto_on_policy_change(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% Queue is on AB but not C.
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, C),
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME,
+ durable = true}),
+
+ %% Sync automatically once the policy is changed to tell us to.
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ restart(Config, A),
+ Params = [rabbit_misc:atom_to_binary(N) || N <- [A, B]],
+ rabbit_ct_broker_helpers:set_ha_policy(Config,
+ A, <<"^ha.two.">>, {<<"nodes">>, Params},
+ [{<<"ha-sync-mode">>, <<"automatic">>}]),
+ wait_for_sync(C, ?QNAME),
+
+ ok.
+
+eager_sync_requeue(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% Queue is on AB but not C.
+ ACh = rabbit_ct_client_helpers:open_channel(Config, A),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, C),
+ amqp_channel:call(ACh, #'queue.declare'{queue = ?QNAME,
+ durable = true}),
+
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, 2),
+ {#'basic.get_ok'{delivery_tag = TagA}, _} =
+ amqp_channel:call(Ch, #'basic.get'{queue = ?QNAME}),
+ {#'basic.get_ok'{delivery_tag = TagB}, _} =
+ amqp_channel:call(Ch, #'basic.get'{queue = ?QNAME}),
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = TagA, requeue = true}),
+ restart(Config, B),
+ ok = sync(C, ?QNAME),
+ amqp_channel:cast(Ch, #'basic.reject'{delivery_tag = TagB, requeue = true}),
+ rabbit_ct_client_helpers:consume(Ch, ?QNAME, 2),
+
+ ok.
+
+restart(Config, Node) ->
+ rabbit_ct_broker_helpers:restart_broker(Config, Node).
+
+sync(Node, QName) ->
+ case sync_nowait(Node, QName) of
+ ok -> wait_for_sync(Node, QName),
+ ok;
+ R -> R
+ end.
+
+sync_nowait(Node, QName) -> action(Node, sync_queue, QName).
+sync_cancel(Node, QName) -> action(Node, cancel_sync_queue, QName).
+
+wait_for_sync(Node, QName) ->
+ sync_detection_SUITE:wait_for_sync_status(true, Node, QName).
+
+action(Node, Action, QName) ->
+ rabbit_ct_broker_helpers:control_action(
+ Action, Node, [binary_to_list(QName)], [{"-p", "/"}]).
+
+queue(Node, QName) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, QName),
+ {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup, [QNameRes]),
+ Q.
+
+wait_for_syncing(Node, QName, Target) ->
+ case state(Node, QName) of
+ {{syncing, _}, _} -> ok;
+ {running, Target} -> synced_already;
+ _ -> timer:sleep(100),
+ wait_for_syncing(Node, QName, Target)
+ end.
+
+wait_for_running(Node, QName) ->
+ case state(Node, QName) of
+ {running, _} -> ok;
+ _ -> timer:sleep(100),
+ wait_for_running(Node, QName)
+ end.
+
+state(Node, QName) ->
+ [{state, State}, {synchronised_slave_pids, Pids}] =
+ rpc:call(Node, rabbit_amqqueue, info,
+ [queue(Node, QName), [state, synchronised_slave_pids]]),
+ {State, length(Pids)}.
+
+%% eager_sync_cancel_test needs a batch size that's < ?MESSAGE_COUNT
+%% in order to pass, because a SyncBatchSize >= ?MESSAGE_COUNT will
+%% always finish before the test is able to cancel the sync.
+set_app_sync_batch_size(Node) ->
+ rabbit_ct_broker_helpers:control_action(
+ eval, Node,
+ ["application:set_env(rabbit, mirroring_sync_batch_size, 1)."]).
diff --git a/test/inet_proxy_dist.erl b/test/inet_proxy_dist.erl
new file mode 100644
index 0000000000..32b7641a79
--- /dev/null
+++ b/test/inet_proxy_dist.erl
@@ -0,0 +1,201 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(inet_proxy_dist).
+
+%% A distribution plugin that uses the usual inet_tcp_dist but allows
+%% insertion of a proxy at the receiving end.
+
+%% inet_*_dist "behaviour"
+-export([listen/1, accept/1, accept_connection/5,
+ setup/5, close/1, select/1, is_node_name/1]).
+
+%% For copypasta from inet_tcp_dist
+-export([do_setup/6]).
+-import(error_logger,[error_msg/2]).
+
+-define(REAL, inet_tcp_dist).
+
+%%----------------------------------------------------------------------------
+
+listen(Name) -> ?REAL:listen(Name).
+select(Node) -> ?REAL:select(Node).
+accept(Listen) -> ?REAL:accept(Listen).
+close(Socket) -> ?REAL:close(Socket).
+is_node_name(Node) -> ?REAL:is_node_name(Node).
+
+accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime) ->
+ ?REAL:accept_connection(AcceptPid, Socket, MyNode, Allowed, SetupTime).
+
+%% This is copied from inet_tcp_dist, in order to change the
+%% output of erl_epmd:port_please/2.
+
+-include_lib("kernel/include/net_address.hrl").
+-include_lib("kernel/include/dist_util.hrl").
+
+setup(Node, Type, MyNode, LongOrShortNames,SetupTime) ->
+ spawn_opt(?MODULE, do_setup,
+ [self(), Node, Type, MyNode, LongOrShortNames, SetupTime],
+ [link, {priority, max}]).
+
+do_setup(Kernel, Node, Type, MyNode, LongOrShortNames,SetupTime) ->
+ ?trace("~p~n",[{inet_tcp_dist,self(),setup,Node}]),
+ [Name, Address] = splitnode(Node, LongOrShortNames),
+ case inet:getaddr(Address, inet) of
+ {ok, Ip} ->
+ Timer = dist_util:start_timer(SetupTime),
+ case erl_epmd:port_please(Name, Ip) of
+ {port, TcpPort, Version} ->
+ ?trace("port_please(~p) -> version ~p~n",
+ [Node,Version]),
+ dist_util:reset_timer(Timer),
+ %% Modification START
+ Ret = application:get_env(kernel,
+ dist_and_proxy_ports_map),
+ PortsMap = case Ret of
+ {ok, M} -> M;
+ undefined -> []
+ end,
+ ProxyPort = case inet_tcp_proxy:is_enabled() of
+ true -> proplists:get_value(TcpPort, PortsMap, TcpPort);
+ false -> TcpPort
+ end,
+ case inet_tcp:connect(Ip, ProxyPort,
+ [{active, false},
+ {packet,2}]) of
+ {ok, Socket} ->
+ {ok, {_, SrcPort}} = inet:sockname(Socket),
+ ok = inet_tcp_proxy_manager:register(
+ node(), Node, SrcPort, TcpPort, ProxyPort),
+ %% Modification END
+ HSData = #hs_data{
+ kernel_pid = Kernel,
+ other_node = Node,
+ this_node = MyNode,
+ socket = Socket,
+ timer = Timer,
+ this_flags = 0,
+ other_version = Version,
+ f_send = fun inet_tcp:send/2,
+ f_recv = fun inet_tcp:recv/3,
+ f_setopts_pre_nodeup =
+ fun(S) ->
+ inet:setopts
+ (S,
+ [{active, false},
+ {packet, 4},
+ nodelay()])
+ end,
+ f_setopts_post_nodeup =
+ fun(S) ->
+ inet:setopts
+ (S,
+ [{active, true},
+ {deliver, port},
+ {packet, 4},
+ nodelay()])
+ end,
+ f_getll = fun inet:getll/1,
+ f_address =
+ fun(_,_) ->
+ #net_address{
+ address = {Ip,TcpPort},
+ host = Address,
+ protocol = tcp,
+ family = inet}
+ end,
+ mf_tick = fun tick/1,
+ mf_getstat = fun inet_tcp_dist:getstat/1,
+ request_type = Type
+ },
+ dist_util:handshake_we_started(HSData);
+ R ->
+ io:format("~p failed! ~p~n", [node(), R]),
+ %% Other Node may have closed since
+ %% port_please !
+ ?trace("other node (~p) "
+ "closed since port_please.~n",
+ [Node]),
+ ?shutdown(Node)
+ end;
+ _ ->
+ ?trace("port_please (~p) "
+ "failed.~n", [Node]),
+ ?shutdown(Node)
+ end;
+ _Other ->
+ ?trace("inet_getaddr(~p) "
+ "failed (~p).~n", [Node,_Other]),
+ ?shutdown(Node)
+ end.
+
+%% If Node is illegal terminate the connection setup!!
+splitnode(Node, LongOrShortNames) ->
+ case split_node(atom_to_list(Node), $@, []) of
+ [Name|Tail] when Tail =/= [] ->
+ Host = lists:append(Tail),
+ case split_node(Host, $., []) of
+ [_] when LongOrShortNames =:= longnames ->
+ error_msg("** System running to use "
+ "fully qualified "
+ "hostnames **~n"
+ "** Hostname ~s is illegal **~n",
+ [Host]),
+ ?shutdown(Node);
+ L when length(L) > 1, LongOrShortNames =:= shortnames ->
+ error_msg("** System NOT running to use fully qualified "
+ "hostnames **~n"
+ "** Hostname ~s is illegal **~n",
+ [Host]),
+ ?shutdown(Node);
+ _ ->
+ [Name, Host]
+ end;
+ [_] ->
+ error_msg("** Nodename ~p illegal, no '@' character **~n",
+ [Node]),
+ ?shutdown(Node);
+ _ ->
+ error_msg("** Nodename ~p illegal **~n", [Node]),
+ ?shutdown(Node)
+ end.
+
+split_node([Chr|T], Chr, Ack) -> [lists:reverse(Ack)|split_node(T, Chr, [])];
+split_node([H|T], Chr, Ack) -> split_node(T, Chr, [H|Ack]);
+split_node([], _, Ack) -> [lists:reverse(Ack)].
+
+%% we may not always want the nodelay behaviour
+%% for performance reasons
+
+nodelay() ->
+ case application:get_env(kernel, dist_nodelay) of
+ undefined ->
+ {nodelay, true};
+ {ok, true} ->
+ {nodelay, true};
+ {ok, false} ->
+ {nodelay, false};
+ _ ->
+ {nodelay, true}
+ end.
+
+tick(Socket) ->
+ case inet_tcp:send(Socket, [], [force]) of
+ {error, closed} ->
+ self() ! {tcp_closed, Socket},
+ {error, closed};
+ R ->
+ R
+ end.
diff --git a/test/inet_tcp_proxy.erl b/test/inet_tcp_proxy.erl
new file mode 100644
index 0000000000..4498b8f952
--- /dev/null
+++ b/test/inet_tcp_proxy.erl
@@ -0,0 +1,134 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(inet_tcp_proxy).
+
+%% A TCP proxy for insertion into the Erlang distribution mechanism,
+%% which allows us to simulate network partitions.
+
+-export([start/3, reconnect/1, is_enabled/0, allow/1, block/1]).
+
+-define(TABLE, ?MODULE).
+
+%% This can't start_link because there's no supervision hierarchy we
+%% can easily fit it into (we need to survive all application
+%% restarts). So we have to do some horrible error handling.
+
+start(ManagerNode, DistPort, ProxyPort) ->
+ application:set_env(kernel, inet_tcp_proxy_manager_node, ManagerNode),
+ Parent = self(),
+ Pid = spawn(error_handler(fun() -> go(Parent, DistPort, ProxyPort) end)),
+ MRef = erlang:monitor(process, Pid),
+ receive
+ ready ->
+ erlang:demonitor(MRef),
+ ok;
+ {'DOWN', MRef, _, _, Reason} ->
+ {error, Reason}
+ end.
+
+reconnect(Nodes) ->
+ [erlang:disconnect_node(N) || N <- Nodes, N =/= node()],
+ ok.
+
+is_enabled() ->
+ lists:member(?TABLE, ets:all()).
+
+allow(Node) ->
+ rabbit_log:info("(~s) Allowing distribution between ~s and ~s~n",
+ [?MODULE, node(), Node]),
+ ets:delete(?TABLE, Node).
+block(Node) ->
+ rabbit_log:info("(~s) BLOCKING distribution between ~s and ~s~n",
+ [?MODULE, node(), Node]),
+ ets:insert(?TABLE, {Node, block}).
+
+%%----------------------------------------------------------------------------
+
+error_handler(Thunk) ->
+ fun () ->
+ try
+ Thunk()
+ catch _:{{nodedown, _}, _} ->
+ %% The only other node we ever talk to is the test
+ %% runner; if that's down then the test is nearly
+ %% over; die quietly.
+ ok;
+ _:X ->
+ io:format(user, "TCP proxy died with ~p~n At ~p~n",
+ [X, erlang:get_stacktrace()]),
+ erlang:halt(1)
+ end
+ end.
+
+go(Parent, Port, ProxyPort) ->
+ ets:new(?TABLE, [public, named_table]),
+ {ok, Sock} = gen_tcp:listen(ProxyPort, [inet,
+ {reuseaddr, true}]),
+ Parent ! ready,
+ accept_loop(Sock, Port).
+
+accept_loop(ListenSock, Port) ->
+ {ok, Sock} = gen_tcp:accept(ListenSock),
+ Proxy = spawn(error_handler(fun() -> run_it(Sock, Port) end)),
+ ok = gen_tcp:controlling_process(Sock, Proxy),
+ accept_loop(ListenSock, Port).
+
+run_it(SockIn, Port) ->
+ case {inet:peername(SockIn), inet:sockname(SockIn)} of
+ {{ok, {_Addr, SrcPort}}, {ok, {Addr, _OtherPort}}} ->
+ {ok, Remote, This} = inet_tcp_proxy_manager:lookup(SrcPort),
+ case node() of
+ This -> ok;
+ _ -> exit({not_me, node(), This})
+ end,
+ {ok, SockOut} = gen_tcp:connect(Addr, Port, [inet]),
+ run_loop({SockIn, SockOut}, Remote, []);
+ _ ->
+ ok
+ end.
+
+run_loop(Sockets, RemoteNode, Buf0) ->
+ Block = [{RemoteNode, block}] =:= ets:lookup(?TABLE, RemoteNode),
+ receive
+ {tcp, Sock, Data} ->
+ Buf = [Data | Buf0],
+ case {Block, get(dist_was_blocked)} of
+ {true, false} ->
+ put(dist_was_blocked, Block),
+ rabbit_log:warning(
+ "(~s) Distribution BLOCKED between ~s and ~s~n",
+ [?MODULE, node(), RemoteNode]);
+ {false, S} when S =:= true orelse S =:= undefined ->
+ put(dist_was_blocked, Block),
+ rabbit_log:warning(
+ "(~s) Distribution allowed between ~s and ~s~n",
+ [?MODULE, node(), RemoteNode]);
+ _ ->
+ ok
+ end,
+ case Block of
+ false -> gen_tcp:send(other(Sock, Sockets), lists:reverse(Buf)),
+ run_loop(Sockets, RemoteNode, []);
+ true -> run_loop(Sockets, RemoteNode, Buf)
+ end;
+ {tcp_closed, Sock} ->
+ gen_tcp:close(other(Sock, Sockets));
+ X ->
+ exit({weirdness, X})
+ end.
+
+other(A, {A, B}) -> B;
+other(B, {A, B}) -> A.
diff --git a/test/inet_tcp_proxy_manager.erl b/test/inet_tcp_proxy_manager.erl
new file mode 100644
index 0000000000..18255b8d48
--- /dev/null
+++ b/test/inet_tcp_proxy_manager.erl
@@ -0,0 +1,107 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(inet_tcp_proxy_manager).
+
+%% The TCP proxies need to decide whether to block based on the node
+%% they're running on, and the node connecting to them. The trouble
+%% is, they don't have an easy way to determine the latter. Therefore
+%% when A connects to B we register the source port used by A here, so
+%% that B can later look it up and find out who A is without having to
+%% sniff the distribution protocol.
+%%
+%% That does unfortunately mean that we need a central control
+%% thing. We assume here it's running on the node called
+%% 'standalone_test' since that's where tests are orchestrated from.
+%%
+%% Yes, this leaks. For its intended lifecycle, that's fine.
+
+-behaviour(gen_server).
+
+-export([start/0, register/5, lookup/1]).
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
+ code_change/3]).
+
+-define(NODE, ct).
+
+-record(state, {ports, pending}).
+
+start() ->
+ gen_server:start({local, ?MODULE}, ?MODULE, [], []).
+
+register(_From, _To, _SrcPort, Port, Port) ->
+ %% No proxy, don't register
+ ok;
+register(From, To, SrcPort, _Port, _ProxyPort) ->
+ gen_server:call(name(), {register, From, To, SrcPort}, infinity).
+
+lookup(SrcPort) ->
+ gen_server:call(name(), {lookup, SrcPort}, infinity).
+
+controller_node() ->
+ {ok, ManagerNode} = application:get_env(kernel,
+ inet_tcp_proxy_manager_node),
+ ManagerNode.
+
+name() ->
+ {?MODULE, controller_node()}.
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ net_kernel:monitor_nodes(true),
+ {ok, #state{ports = dict:new(),
+ pending = []}}.
+
+handle_call({register, FromNode, ToNode, SrcPort}, _From,
+ State = #state{ports = Ports,
+ pending = Pending}) ->
+ {Notify, Pending2} =
+ lists:partition(fun ({P, _}) -> P =:= SrcPort end, Pending),
+ [gen_server:reply(From, {ok, FromNode, ToNode}) || {_, From} <- Notify],
+ {reply, ok,
+ State#state{ports = dict:store(SrcPort, {FromNode, ToNode}, Ports),
+ pending = Pending2}};
+
+handle_call({lookup, SrcPort}, From,
+ State = #state{ports = Ports, pending = Pending}) ->
+ case dict:find(SrcPort, Ports) of
+ {ok, {FromNode, ToNode}} ->
+ {reply, {ok, FromNode, ToNode}, State};
+ error ->
+ {noreply, State#state{pending = [{SrcPort, From} | Pending]}}
+ end;
+
+handle_call(_Req, _From, State) ->
+ {reply, unknown_request, State}.
+
+handle_cast(_C, State) ->
+ {noreply, State}.
+
+handle_info({nodedown, Node}, State = #state{ports = Ports}) ->
+ Ports1 = dict:filter(
+ fun (_, {From, To}) ->
+ Node =/= From andalso Node =/= To
+ end, Ports),
+ {noreply, State#state{ports = Ports1}};
+
+handle_info(_I, State) ->
+ {noreply, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_, State, _) -> {ok, State}.
diff --git a/test/lazy_queue_SUITE.erl b/test/lazy_queue_SUITE.erl
new file mode 100644
index 0000000000..fe105cddd0
--- /dev/null
+++ b/test/lazy_queue_SUITE.erl
@@ -0,0 +1,224 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(lazy_queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(QNAME, <<"queue.mode.test">>).
+-define(MESSAGE_COUNT, 2000).
+
+all() ->
+ [
+ {group, non_parallel_tests}
+ ].
+
+groups() ->
+ [
+ {non_parallel_tests, [], [
+ declare_args,
+ queue_mode_policy,
+ publish_consume
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = 2,
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_all/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+declare_args(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ LQ = <<"lazy-q">>,
+ declare(Ch, LQ, [{<<"x-queue-mode">>, longstr, <<"lazy">>}]),
+ assert_queue_mode(A, LQ, lazy),
+
+ DQ = <<"default-q">>,
+ declare(Ch, DQ, [{<<"x-queue-mode">>, longstr, <<"default">>}]),
+ assert_queue_mode(A, DQ, default),
+
+ DQ2 = <<"default-q2">>,
+ declare(Ch, DQ2),
+ assert_queue_mode(A, DQ2, default),
+
+ passed.
+
+queue_mode_policy(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ set_ha_mode_policy(Config, A, <<"lazy">>),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+
+ LQ = <<"lazy-q">>,
+ declare(Ch, LQ, [{<<"x-queue-mode">>, longstr, <<"lazy">>}]),
+ assert_queue_mode(A, LQ, lazy),
+
+ LQ2 = <<"lazy-q-2">>,
+ declare(Ch, LQ2),
+ assert_queue_mode(A, LQ2, lazy),
+
+ DQ = <<"default-q">>,
+ declare(Ch, DQ, [{<<"x-queue-mode">>, longstr, <<"default">>}]),
+ assert_queue_mode(A, DQ, default),
+
+ set_ha_mode_policy(Config, A, <<"default">>),
+
+ ok = wait_for_queue_mode(A, LQ, lazy, 5000),
+ ok = wait_for_queue_mode(A, LQ2, default, 5000),
+ ok = wait_for_queue_mode(A, DQ, default, 5000),
+
+ passed.
+
+publish_consume(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ declare(Ch, ?QNAME),
+
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ consume(Ch, ?QNAME, ack),
+ [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],
+
+ set_ha_mode_policy(Config, A, <<"lazy">>),
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],
+
+ set_ha_mode_policy(Config, A, <<"default">>),
+ [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],
+
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ set_ha_mode_policy(Config, A, <<"lazy">>),
+ rabbit_ct_client_helpers:publish(Ch, ?QNAME, ?MESSAGE_COUNT),
+ set_ha_mode_policy(Config, A, <<"default">>),
+ [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],
+
+ set_ha_mode_policy(Config, A, <<"lazy">>),
+ [assert_delivered(Ch, ack, P) || P <- lists:seq(1, ?MESSAGE_COUNT)],
+
+ cancel(Ch),
+
+ passed.
+
+%%----------------------------------------------------------------------------
+
+declare(Ch, Q) ->
+ declare(Ch, Q, []).
+
+declare(Ch, Q, Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ arguments = Args}).
+
+consume(Ch, Q, Ack) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = Ack =:= no_ack,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+cancel(Ch) ->
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}).
+
+assert_delivered(Ch, Ack, Payload) ->
+ PBin = payload2bin(Payload),
+ receive
+ {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} ->
+ PBin = PBin2,
+ maybe_ack(Ch, Ack, DTag)
+ end.
+
+maybe_ack(Ch, do_ack, DTag) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
+ DTag;
+maybe_ack(_Ch, _, DTag) ->
+ DTag.
+
+payload2bin(Int) -> list_to_binary(integer_to_list(Int)).
+
+set_ha_mode_policy(Config, Node, Mode) ->
+ ok = rabbit_ct_broker_helpers:set_ha_policy(Config, Node, <<".*">>, <<"all">>,
+ [{<<"queue-mode">>, Mode}]).
+
+
+wait_for_queue_mode(_Node, _Q, _Mode, Max) when Max < 0 ->
+ fail;
+wait_for_queue_mode(Node, Q, Mode, Max) ->
+ case get_queue_mode(Node, Q) of
+ Mode -> ok;
+ _ -> timer:sleep(100),
+ wait_for_queue_mode(Node, Q, Mode, Max - 100)
+ end.
+
+assert_queue_mode(Node, Q, Expected) ->
+ Actual = get_queue_mode(Node, Q),
+ Expected = Actual.
+
+get_queue_mode(Node, Q) ->
+ QNameRes = rabbit_misc:r(<<"/">>, queue, Q),
+ {ok, AMQQueue} =
+ rpc:call(Node, rabbit_amqqueue, lookup, [QNameRes]),
+ [{backing_queue_status, Status}] =
+ rpc:call(Node, rabbit_amqqueue, info,
+ [AMQQueue, [backing_queue_status]]),
+ proplists:get_value(mode, Status).
diff --git a/test/many_node_ha_SUITE.erl b/test/many_node_ha_SUITE.erl
new file mode 100644
index 0000000000..22b39e7a3d
--- /dev/null
+++ b/test/many_node_ha_SUITE.erl
@@ -0,0 +1,117 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(many_node_ha_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_6}
+ ].
+
+groups() ->
+ [
+ {cluster_size_6, [], [
+ kill_intermediate
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_6, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 6}
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_all/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+kill_intermediate(Config) ->
+ [A, B, C, D, E, F] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000),
+ MasterChannel = rabbit_ct_client_helpers:open_channel(Config, A),
+ ConsumerChannel = rabbit_ct_client_helpers:open_channel(Config, E),
+ ProducerChannel = rabbit_ct_client_helpers:open_channel(Config, F),
+ Queue = <<"test">>,
+ amqp_channel:call(MasterChannel, #'queue.declare'{queue = Queue,
+ auto_delete = false}),
+
+ %% TODO: this seems *highly* timing dependant - the assumption being
+ %% that the kill will work quickly enough that there will still be
+ %% some messages in-flight that we *must* receive despite the intervening
+ %% node deaths. It would be nice if we could find a means to do this
+ %% in a way that is not actually timing dependent.
+
+ %% Worse still, it assumes that killing the master will cause a
+ %% failover to Slave1, and so on. Nope.
+
+ ConsumerPid = rabbit_ha_test_consumer:create(ConsumerChannel,
+ Queue, self(), false, Msgs),
+
+ ProducerPid = rabbit_ha_test_producer:create(ProducerChannel,
+ Queue, self(), false, Msgs),
+
+ %% create a killer for the master and the first 3 slaves
+ [rabbit_ct_broker_helpers:kill_node_after(Config, Node, Time) ||
+ {Node, Time} <- [{A, 50},
+ {B, 50},
+ {C, 100},
+ {D, 100}]],
+
+ %% verify that the consumer got all msgs, or die, or time out
+ rabbit_ha_test_producer:await_response(ProducerPid),
+ rabbit_ha_test_consumer:await_response(ConsumerPid),
+ ok.
+
diff --git a/test/partitions_SUITE.erl b/test/partitions_SUITE.erl
new file mode 100644
index 0000000000..b93e1ea9dd
--- /dev/null
+++ b/test/partitions_SUITE.erl
@@ -0,0 +1,438 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(partitions_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+%% We set ticktime to 1s and setuptime is 7s so to make sure it
+%% passes...
+-define(DELAY, 8000).
+
+all() ->
+ [
+ {group, net_ticktime_1},
+ {group, net_ticktime_10}
+ ].
+
+groups() ->
+ [
+ {net_ticktime_1, [], [
+ {cluster_size_2, [], [
+ ctl_ticktime_sync,
+ prompt_disconnect_detection
+ ]},
+ {cluster_size_3, [], [
+ autoheal,
+ autoheal_after_pause_if_all_down,
+ ignore,
+ pause_if_all_down_on_blocked,
+ pause_if_all_down_on_down,
+ pause_minority_on_blocked,
+ pause_minority_on_down,
+ partial_false_positive,
+ partial_to_full,
+ partial_pause_minority,
+ partial_pause_if_all_down
+ ]}
+ ]},
+ {net_ticktime_10, [], [
+ {cluster_size_2, [], [
+ pause_if_all_down_false_promises_mirrored,
+ pause_if_all_down_false_promises_unmirrored,
+ pause_minority_false_promises_mirrored,
+ pause_minority_false_promises_unmirrored
+ ]}
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config, [
+ fun enable_dist_proxy_manager/1
+ ]).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(net_ticktime_1, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 1}]);
+init_per_group(net_ticktime_10, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]);
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, false},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun enable_dist_proxy/1,
+ fun rabbit_ct_broker_helpers:cluster_nodes/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+enable_dist_proxy_manager(Config) ->
+ inet_tcp_proxy_manager:start(),
+ rabbit_ct_helpers:set_config(Config,
+ {erlang_dist_module, inet_proxy_dist}).
+
+enable_dist_proxy(Config) ->
+ NodeConfigs = rabbit_ct_broker_helpers:get_node_configs(Config),
+ Nodes = [?config(nodename, NodeConfig) || NodeConfig <- NodeConfigs],
+ ManagerNode = node(),
+ ok = lists:foreach(
+ fun(NodeConfig) ->
+ ok = rabbit_ct_broker_helpers:rpc(Config,
+ ?config(nodename, NodeConfig),
+ ?MODULE, enable_dist_proxy_on_node,
+ [NodeConfig, ManagerNode, Nodes])
+ end, NodeConfigs),
+ Config.
+
+enable_dist_proxy_on_node(NodeConfig, ManagerNode, Nodes) ->
+ Nodename = ?config(nodename, NodeConfig),
+ DistPort = ?config(tcp_port_erlang_dist, NodeConfig),
+ ProxyPort = ?config(tcp_port_erlang_dist_proxy, NodeConfig),
+ ok = inet_tcp_proxy:start(ManagerNode, DistPort, ProxyPort),
+ ok = inet_tcp_proxy:reconnect(Nodes -- [Nodename]).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+ignore(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ block_unblock([{A, B}, {A, C}]),
+ timer:sleep(?DELAY),
+ [B, C] = partitions(A),
+ [A] = partitions(B),
+ [A] = partitions(C),
+ ok.
+
+pause_minority_on_down(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, pause_minority),
+
+ true = is_running(A),
+
+ rabbit_ct_broker_helpers:kill_node(Config, B),
+ timer:sleep(?DELAY),
+ true = is_running(A),
+
+ rabbit_ct_broker_helpers:kill_node(Config, C),
+ await_running(A, false),
+ ok.
+
+pause_minority_on_blocked(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, pause_minority),
+ pause_on_blocked(A, B, C).
+
+pause_if_all_down_on_down(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, {pause_if_all_down, [C], ignore}),
+ [(true = is_running(N)) || N <- [A, B, C]],
+
+ rabbit_ct_broker_helpers:kill_node(Config, B),
+ timer:sleep(?DELAY),
+ [(true = is_running(N)) || N <- [A, C]],
+
+ rabbit_ct_broker_helpers:kill_node(Config, C),
+ timer:sleep(?DELAY),
+ await_running(A, false),
+ ok.
+
+pause_if_all_down_on_blocked(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, {pause_if_all_down, [C], ignore}),
+ pause_on_blocked(A, B, C).
+
+pause_on_blocked(A, B, C) ->
+ [(true = is_running(N)) || N <- [A, B, C]],
+ block([{A, B}, {A, C}]),
+ await_running(A, false),
+ [await_running(N, true) || N <- [B, C]],
+ unblock([{A, B}, {A, C}]),
+ [await_running(N, true) || N <- [A, B, C]],
+ Status = rpc:call(B, rabbit_mnesia, status, []),
+ [] = rabbit_misc:pget(partitions, Status),
+ ok.
+
+%%% Make sure we do not confirm any messages after a partition has
+%%% happened but before we pause, since any such confirmations would be
+%%% lies.
+%%%
+%%% This test has to use an AB cluster (not ABC) since GM ends up
+%%% taking longer to detect down slaves when there are more nodes and
+%%% we close the window by mistake.
+%%%
+%%% In general there are quite a few ways to accidentally cause this
+%%% test to pass since there are a lot of things in the broker that can
+%%% suddenly take several seconds to time out when TCP connections
+%%% won't establish.
+
+pause_minority_false_promises_mirrored(Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
+ pause_false_promises(Config, pause_minority).
+
+pause_minority_false_promises_unmirrored(Config) ->
+ pause_false_promises(Config, pause_minority).
+
+pause_if_all_down_false_promises_mirrored(Config) ->
+ rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<".*">>, <<"all">>),
+ B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ pause_false_promises(Config, {pause_if_all_down, [B], ignore}).
+
+pause_if_all_down_false_promises_unmirrored(Config) ->
+ B = rabbit_ct_broker_helpers:get_node_config(Config, 1, nodename),
+ pause_false_promises(Config, {pause_if_all_down, [B], ignore}).
+
+pause_false_promises(Config, ClusterPartitionHandling) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, [A], ClusterPartitionHandling),
+ ChA = rabbit_ct_client_helpers:open_channel(Config, A),
+ ChB = rabbit_ct_client_helpers:open_channel(Config, B),
+ amqp_channel:call(ChB, #'queue.declare'{queue = <<"test">>,
+ durable = true}),
+ amqp_channel:call(ChA, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(ChA, self()),
+
+ %% Cause a partition after 1s
+ Self = self(),
+ spawn_link(fun () ->
+ timer:sleep(1000),
+ %%io:format(user, "~p BLOCK~n", [calendar:local_time()]),
+ block([{A, B}]),
+ unlink(Self)
+ end),
+
+ %% Publish large no of messages, see how many we get confirmed
+ [amqp_channel:cast(ChA, #'basic.publish'{routing_key = <<"test">>},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 1}}) ||
+ _ <- lists:seq(1, 100000)],
+ %%io:format(user, "~p finish publish~n", [calendar:local_time()]),
+
+ %% Time for the partition to be detected. We don't put this sleep
+ %% in receive_acks since otherwise we'd have another similar sleep
+ %% at the end.
+ timer:sleep(30000),
+ Confirmed = receive_acks(0),
+ %%io:format(user, "~p got acks~n", [calendar:local_time()]),
+ await_running(A, false),
+ %%io:format(user, "~p A stopped~n", [calendar:local_time()]),
+
+ unblock([{A, B}]),
+ await_running(A, true),
+
+ %% But how many made it onto the rest of the cluster?
+ #'queue.declare_ok'{message_count = Survived} =
+ amqp_channel:call(ChB, #'queue.declare'{queue = <<"test">>,
+ durable = true}),
+ %%io:format(user, "~p queue declared~n", [calendar:local_time()]),
+ case Confirmed > Survived of
+ true -> io:format("Confirmed=~p Survived=~p~n", [Confirmed, Survived]);
+ false -> ok
+ end,
+ true = (Confirmed =< Survived),
+
+ rabbit_ct_client_helpers:close_channel(ChB),
+ rabbit_ct_client_helpers:close_channel(ChA),
+ ok.
+
+receive_acks(Max) ->
+ receive
+ #'basic.ack'{delivery_tag = DTag} ->
+ receive_acks(DTag)
+ after ?DELAY ->
+ Max
+ end.
+
+prompt_disconnect_detection(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ ChB = rabbit_ct_client_helpers:open_channel(Config, B),
+ [amqp_channel:call(ChB, #'queue.declare'{}) || _ <- lists:seq(1, 100)],
+ block([{A, B}]),
+ timer:sleep(?DELAY),
+ %% We want to make sure we do not end up waiting for setuptime *
+ %% no of queues. Unfortunately that means we need a timeout...
+ [] = rabbit_ct_broker_helpers:rpc(Config, A,
+ rabbit_amqqueue, info_all, [<<"/">>], ?DELAY),
+ rabbit_ct_client_helpers:close_channel(ChB),
+ ok.
+
+ctl_ticktime_sync(Config) ->
+ %% Server has 1s net_ticktime, make sure ctl doesn't get disconnected
+ Cmd = ["eval", "timer:sleep(5000)."],
+ {ok, "ok\n"} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd).
+
+%% NB: we test full and partial partitions here.
+autoheal(Config) ->
+ set_mode(Config, autoheal),
+ do_autoheal(Config).
+
+autoheal_after_pause_if_all_down(Config) ->
+ [_, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, {pause_if_all_down, [B, C], autoheal}),
+ do_autoheal(Config).
+
+do_autoheal(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Test = fun (Pairs) ->
+ block_unblock(Pairs),
+ %% Sleep to make sure all the partitions are noticed
+ %% ?DELAY for the net_tick timeout
+ timer:sleep(?DELAY),
+ [await_listening(N, true) || N <- [A, B, C]],
+ [await_partitions(N, []) || N <- [A, B, C]]
+ end,
+ Test([{B, C}]),
+ Test([{A, C}, {B, C}]),
+ Test([{A, B}, {A, C}, {B, C}]),
+ ok.
+
+partial_false_positive(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ block([{A, B}]),
+ timer:sleep(1000),
+ block([{A, C}]),
+ timer:sleep(?DELAY),
+ unblock([{A, B}, {A, C}]),
+ timer:sleep(?DELAY),
+ %% When B times out A's connection, it will check with C. C will
+ %% not have timed out A yet, but already it can't talk to it. We
+ %% need to not consider this a partial partition; B and C should
+ %% still talk to each other.
+ [B, C] = partitions(A),
+ [A] = partitions(B),
+ [A] = partitions(C),
+ ok.
+
+partial_to_full(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ block_unblock([{A, B}]),
+ timer:sleep(?DELAY),
+ %% There are several valid ways this could go, depending on how
+ %% the DOWN messages race: either A gets disconnected first and BC
+ %% stay together, or B gets disconnected first and AC stay
+ %% together, or both make it through and all three get
+ %% disconnected.
+ case {partitions(A), partitions(B), partitions(C)} of
+ {[B, C], [A], [A]} -> ok;
+ {[B], [A, C], [B]} -> ok;
+ {[B, C], [A, C], [A, B]} -> ok;
+ Partitions -> exit({partitions, Partitions})
+ end.
+
+partial_pause_minority(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, pause_minority),
+ block([{A, B}]),
+ [await_running(N, false) || N <- [A, B]],
+ await_running(C, true),
+ unblock([{A, B}]),
+ [await_listening(N, true) || N <- [A, B, C]],
+ [await_partitions(N, []) || N <- [A, B, C]],
+ ok.
+
+partial_pause_if_all_down(Config) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ set_mode(Config, {pause_if_all_down, [B], ignore}),
+ block([{A, B}]),
+ await_running(A, false),
+ [await_running(N, true) || N <- [B, C]],
+ unblock([{A, B}]),
+ [await_listening(N, true) || N <- [A, B, C]],
+ [await_partitions(N, []) || N <- [A, B, C]],
+ ok.
+
+set_mode(Config, Mode) ->
+ rabbit_ct_broker_helpers:rpc_all(Config,
+ application, set_env, [rabbit, cluster_partition_handling, Mode]).
+
+set_mode(Config, Nodes, Mode) ->
+ rabbit_ct_broker_helpers:rpc(Config, Nodes,
+ application, set_env, [rabbit, cluster_partition_handling, Mode]).
+
+block_unblock(Pairs) ->
+ block(Pairs),
+ timer:sleep(?DELAY),
+ unblock(Pairs).
+
+block(Pairs) -> [block(X, Y) || {X, Y} <- Pairs].
+unblock(Pairs) -> [allow(X, Y) || {X, Y} <- Pairs].
+
+partitions(Node) ->
+ case rpc:call(Node, rabbit_node_monitor, partitions, []) of
+ {badrpc, {'EXIT', E}} = R -> case rabbit_misc:is_abnormal_exit(E) of
+ true -> R;
+ false -> timer:sleep(1000),
+ partitions(Node)
+ end;
+ Partitions -> Partitions
+ end.
+
+block(X, Y) ->
+ rpc:call(X, inet_tcp_proxy, block, [Y]),
+ rpc:call(Y, inet_tcp_proxy, block, [X]).
+
+allow(X, Y) ->
+ rpc:call(X, inet_tcp_proxy, allow, [Y]),
+ rpc:call(Y, inet_tcp_proxy, allow, [X]).
+
+await_running (Node, Bool) -> await(Node, Bool, fun is_running/1).
+await_listening (Node, Bool) -> await(Node, Bool, fun is_listening/1).
+await_partitions(Node, Parts) -> await(Node, Parts, fun partitions/1).
+
+await(Node, Res, Fun) ->
+ case Fun(Node) of
+ Res -> ok;
+ _ -> timer:sleep(100),
+ await(Node, Res, Fun)
+ end.
+
+is_running(Node) -> rpc:call(Node, rabbit, is_running, []).
+
+is_listening(Node) ->
+ case rpc:call(Node, rabbit_networking, node_listeners, [Node]) of
+ [] -> false;
+ [_|_] -> true;
+ _ -> false
+ end.
diff --git a/test/priority_queue_SUITE.erl b/test/priority_queue_SUITE.erl
new file mode 100644
index 0000000000..5df5686090
--- /dev/null
+++ b/test/priority_queue_SUITE.erl
@@ -0,0 +1,558 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(priority_queue_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_2},
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ {parallel_tests, [parallel], [
+ ackfold,
+ drop,
+ dropwhile_fetchwhile,
+ info_head_message_timestamp,
+ matching,
+ mirror_queue_sync,
+ mirror_queue_sync_priority_above_max,
+ mirror_queue_sync_priority_above_max_pending_ack,
+ purge,
+ requeue,
+ resume,
+ simple_order,
+ straight_through
+ ]},
+ {non_parallel_tests, [], [
+ recovery %% Restart RabbitMQ.
+ ]}
+ ]},
+ {cluster_size_3, [], [
+ {parallel_tests, [parallel], [
+ mirror_queue_auto_ack
+ ]}
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_group(cluster_size_3, Config) ->
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps());
+init_per_group(_, Config) ->
+ Config.
+
+end_per_group(ClusterSizeGroup, Config)
+when ClusterSizeGroup =:= cluster_size_2
+orelse ClusterSizeGroup =:= cluster_size_3 ->
+ rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps());
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase).
+
+end_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_finished(Config, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+%% The BQ API is used in all sorts of places in all sorts of
+%% ways. Therefore we have to jump through a few different hoops
+%% in order to integration-test it.
+%%
+%% * start/1, stop/0, init/3, terminate/2, delete_and_terminate/2
+%% - starting and stopping rabbit. durable queues / persistent msgs needed
+%% to test recovery
+%%
+%% * publish/5, drain_confirmed/1, fetch/2, ack/2, is_duplicate/2, msg_rates/1,
+%% needs_timeout/1, timeout/1, invoke/3, resume/1 [0]
+%% - regular publishing and consuming, with confirms and acks and durability
+%%
+%% * publish_delivered/4 - publish with acks straight through
+%% * discard/3 - publish without acks straight through
+%% * dropwhile/2 - expire messages without DLX
+%% * fetchwhile/4 - expire messages with DLX
+%% * ackfold/4 - reject messages with DLX
+%% * requeue/2 - reject messages without DLX
+%% * drop/2 - maxlen messages without DLX
+%% * purge/1 - issue AMQP queue.purge
+%% * purge_acks/1 - mirror queue explicit sync with unacked msgs
+%% * fold/3 - mirror queue explicit sync
+%% * depth/1 - mirror queue implicit sync detection
+%% * len/1, is_empty/1 - info items
+%% * handle_pre_hibernate/1 - hibernation
+%%
+%% * set_ram_duration_target/2, ram_duration/1, status/1
+%% - maybe need unit testing?
+%%
+%% [0] publish enough to get credit flow from msg store
+
+recovery(Config) ->
+ {Conn, Ch} = open(Config),
+ Q = <<"recovery-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ amqp_connection:close(Conn),
+
+ %% TODO This terminates the automatically open connection and breaks
+ %% coverage.
+ rabbit_ct_broker_helpers:restart_broker(Config, 0),
+
+ {Conn2, Ch2} = open(Config),
+ get_all(Ch2, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
+ delete(Ch2, Q),
+ amqp_connection:close(Conn2),
+ passed.
+
+simple_order(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"simple_order-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
+ publish(Ch, Q, [2, 3, 1, 2, 3, 1, 2, 3, 1]),
+ get_all(Ch, Q, no_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
+ publish(Ch, Q, [3, 1, 2, 3, 1, 2, 3, 1, 2]),
+ get_all(Ch, Q, do_ack, [3, 3, 3, 2, 2, 2, 1, 1, 1]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+matching(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"matching-queue">>,
+ declare(Ch, Q, 5),
+ %% We round priority down, and 0 is the default
+ publish(Ch, Q, [undefined, 0, 5, 10, undefined]),
+ get_all(Ch, Q, do_ack, [5, 10, undefined, 0, undefined]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+resume(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"resume-queue">>,
+ declare(Ch, Q, 5),
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ publish_many(Ch, Q, 10000),
+ amqp_channel:wait_for_confirms(Ch),
+ amqp_channel:call(Ch, #'queue.purge'{queue = Q}), %% Assert it exists
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+straight_through(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"straight_through-queue">>,
+ declare(Ch, Q, 3),
+ [begin
+ consume(Ch, Q, Ack),
+ [begin
+ publish1(Ch, Q, P),
+ assert_delivered(Ch, Ack, P)
+ end || P <- [1, 2, 3]],
+ cancel(Ch)
+ end || Ack <- [do_ack, no_ack]],
+ get_empty(Ch, Q),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+dropwhile_fetchwhile(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"dropwhile_fetchwhile-queue">>,
+ [begin
+ declare(Ch, Q, Args ++ arguments(3)),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ timer:sleep(10),
+ get_empty(Ch, Q),
+ delete(Ch, Q)
+ end ||
+ Args <- [[{<<"x-message-ttl">>, long, 1}],
+ [{<<"x-message-ttl">>, long, 1},
+ {<<"x-dead-letter-exchange">>, longstr, <<"amq.fanout">>}]
+ ]],
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+ackfold(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"ackfolq-queue1">>,
+ Q2 = <<"ackfold-queue2">>,
+ declare(Ch, Q,
+ [{<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, Q2}
+ | arguments(3)]),
+ declare(Ch, Q2, none),
+ publish(Ch, Q, [1, 2, 3]),
+ [_, _, DTag] = get_all(Ch, Q, manual_ack, [3, 2, 1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = true,
+ requeue = false}),
+ timer:sleep(100),
+ get_all(Ch, Q2, do_ack, [3, 2, 1]),
+ delete(Ch, Q),
+ delete(Ch, Q2),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+requeue(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"requeue-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3]),
+ [_, _, DTag] = get_all(Ch, Q, manual_ack, [3, 2, 1]),
+ amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DTag,
+ multiple = true,
+ requeue = true}),
+ get_all(Ch, Q, do_ack, [3, 2, 1]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+drop(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"drop-queue">>,
+ declare(Ch, Q, [{<<"x-max-length">>, long, 4} | arguments(3)]),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3, 1, 2, 3]),
+ %% We drop from the head, so this is according to the "spec" even
+ %% if not likely to be what the user wants.
+ get_all(Ch, Q, do_ack, [2, 1, 1, 1]),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+purge(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"purge-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3]),
+ amqp_channel:call(Ch, #'queue.purge'{queue = Q}),
+ get_empty(Ch, Q),
+ delete(Ch, Q),
+ rabbit_ct_client_helpers:close_channel(Ch),
+ passed.
+
+info_head_message_timestamp(Config) ->
+ passed = rabbit_ct_broker_helpers:rpc(Config, 0,
+ ?MODULE, info_head_message_timestamp1, [Config]).
+
+info_head_message_timestamp1(_Config) ->
+ QName = rabbit_misc:r(<<"/">>, queue,
+ <<"info_head_message_timestamp-queue">>),
+ Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
+ Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 2}]},
+ PQ = rabbit_priority_queue,
+ BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ %% The queue is empty: no timestamp.
+ true = PQ:is_empty(BQS1),
+ '' = PQ:info(head_message_timestamp, BQS1),
+ %% Publish one message with timestamp 1000.
+ Msg1 = #basic_message{
+ id = msg1,
+ content = #content{
+ properties = #'P_basic'{
+ priority = 1,
+ timestamp = 1000
+ }},
+ is_persistent = false
+ },
+ BQS2 = PQ:publish(Msg1, #message_properties{size = 0}, false, self(),
+ noflow, BQS1),
+ 1000 = PQ:info(head_message_timestamp, BQS2),
+ %% Publish a higher priority message with no timestamp.
+ Msg2 = #basic_message{
+ id = msg2,
+ content = #content{
+ properties = #'P_basic'{
+ priority = 2
+ }},
+ is_persistent = false
+ },
+ BQS3 = PQ:publish(Msg2, #message_properties{size = 0}, false, self(),
+ noflow, BQS2),
+ '' = PQ:info(head_message_timestamp, BQS3),
+ %% Consume message with no timestamp.
+ {{Msg2, _, _}, BQS4} = PQ:fetch(false, BQS3),
+ 1000 = PQ:info(head_message_timestamp, BQS4),
+ %% Consume message with timestamp 1000, but do not acknowledge it
+ %% yet. The goal is to verify that the unacknowledged message's
+ %% timestamp is returned.
+ {{Msg1, _, AckTag}, BQS5} = PQ:fetch(true, BQS4),
+ 1000 = PQ:info(head_message_timestamp, BQS5),
+ %% Ack message. The queue is empty now.
+ {[msg1], BQS6} = PQ:ack([AckTag], BQS5),
+ true = PQ:is_empty(BQS6),
+ '' = PQ:info(head_message_timestamp, BQS6),
+ PQ:delete_and_terminate(a_whim, BQS6),
+ passed.
+
+ram_duration(_Config) ->
+ QName = rabbit_misc:r(<<"/">>, queue, <<"ram_duration-queue">>),
+ Q0 = rabbit_amqqueue:pseudo_queue(QName, self()),
+ Q = Q0#amqqueue{arguments = [{<<"x-max-priority">>, long, 5}]},
+ PQ = rabbit_priority_queue,
+ BQS1 = PQ:init(Q, new, fun(_, _) -> ok end),
+ {_Duration1, BQS2} = PQ:ram_duration(BQS1),
+ BQS3 = PQ:set_ram_duration_target(infinity, BQS2),
+ BQS4 = PQ:set_ram_duration_target(1, BQS3),
+ {_Duration2, BQS5} = PQ:ram_duration(BQS4),
+ PQ:delete_and_terminate(a_whim, BQS5),
+ passed.
+
+mirror_queue_sync(Config) ->
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ Q = <<"mirror_queue_sync-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3]),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(Config, 0,
+ <<"^mirror_queue_sync-queue$">>, <<"all">>),
+ publish(Ch, Q, [1, 2, 3, 1, 2, 3]),
+ %% master now has 9, slave 6.
+ get_partial(Ch, Q, manual_ack, [3, 3, 3, 2, 2, 2]),
+ %% So some but not all are unacked at the slave
+ Nodename0 = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ rabbit_ct_broker_helpers:control_action(sync_queue, Nodename0,
+ [binary_to_list(Q)], [{"-p", "/"}]),
+ wait_for_sync(Config, Nodename0, rabbit_misc:r(<<"/">>, queue, Q)),
+ passed.
+
+mirror_queue_sync_priority_above_max(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% Tests synchronisation of slaves when priority is higher than max priority.
+ %% This causes an infinity loop (and test timeout) before rabbitmq-server-795
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"mirror_queue_sync_priority_above_max-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [5, 5, 5]),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
+ <<".*">>, <<"all">>),
+ rabbit_ct_broker_helpers:control_action(sync_queue, A,
+ [binary_to_list(Q)], [{"-p", "/"}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ delete(Ch, Q),
+ passed.
+
+mirror_queue_sync_priority_above_max_pending_ack(Config) ->
+ [A, B] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ %% Tests synchronisation of slaves when priority is higher than max priority
+ %% and there are pending acks.
+ %% This causes an infinity loop (and test timeout) before rabbitmq-server-795
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"mirror_queue_sync_priority_above_max_pending_ack-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [5, 5, 5]),
+ %% Consume but 'forget' to acknowledge
+ get_without_ack(Ch, Q),
+ get_without_ack(Ch, Q),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
+ <<".*">>, <<"all">>),
+ rabbit_ct_broker_helpers:control_action(sync_queue, A,
+ [binary_to_list(Q)], [{"-p", "/"}]),
+ wait_for_sync(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ synced_msgs(Config, A, rabbit_misc:r(<<"/">>, queue, Q), 3),
+ synced_msgs(Config, B, rabbit_misc:r(<<"/">>, queue, Q), 3),
+ delete(Ch, Q),
+ passed.
+
+mirror_queue_auto_ack(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ %% Check correct use of AckRequired in the notifications to the slaves.
+ %% If slaves are notified with AckRequired == true when it is false,
+ %% the slaves will crash with the depth notification as they will not
+ %% match the master delta.
+ %% Bug rabbitmq-server 687
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Q = <<"mirror_queue_auto_ack-queue">>,
+ declare(Ch, Q, 3),
+ publish(Ch, Q, [1, 2, 3]),
+ ok = rabbit_ct_broker_helpers:set_ha_policy(Config, A,
+ <<".*">>, <<"all">>),
+ get_partial(Ch, Q, no_ack, [3, 2, 1]),
+
+ %% Retrieve slaves
+ SPids = slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q)),
+ [{SNode1, _SPid1}, {SNode2, SPid2}] = nodes_and_pids(SPids),
+
+ %% Restart one of the slaves so `request_depth` is triggered
+ rabbit_ct_broker_helpers:restart_node(Config, SNode1),
+
+ %% The alive slave must have the same pid after its neighbour is restarted
+ timer:sleep(3000), %% ugly but we can't know when the `depth` instruction arrives
+ Slaves = nodes_and_pids(slave_pids(Config, A, rabbit_misc:r(<<"/">>, queue, Q))),
+ SPid2 = proplists:get_value(SNode2, Slaves),
+
+ delete(Ch, Q),
+ passed.
+
+%%----------------------------------------------------------------------------
+
+open(Config) ->
+ Conn = rabbit_ct_client_helpers:open_connection(Config, 0),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, 0),
+ {Conn, Ch}.
+
+declare(Ch, Q, Args) when is_list(Args) ->
+ amqp_channel:call(Ch, #'queue.declare'{queue = Q,
+ durable = true,
+ arguments = Args});
+declare(Ch, Q, Max) ->
+ declare(Ch, Q, arguments(Max)).
+
+delete(Ch, Q) ->
+ amqp_channel:call(Ch, #'queue.delete'{queue = Q}).
+
+publish(Ch, Q, Ps) ->
+ amqp_channel:call(Ch, #'confirm.select'{}),
+ [publish1(Ch, Q, P) || P <- Ps],
+ amqp_channel:wait_for_confirms(Ch).
+
+publish_many(_Ch, _Q, 0) -> ok;
+publish_many( Ch, Q, N) -> publish1(Ch, Q, random:uniform(5)),
+ publish_many(Ch, Q, N - 1).
+
+publish1(Ch, Q, P) ->
+ amqp_channel:cast(Ch, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = props(P),
+ payload = priority2bin(P)}).
+
+props(undefined) -> #'P_basic'{delivery_mode = 2};
+props(P) -> #'P_basic'{priority = P,
+ delivery_mode = 2}.
+
+consume(Ch, Q, Ack) ->
+ amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q,
+ no_ack = Ack =:= no_ack,
+ consumer_tag = <<"ctag">>},
+ self()),
+ receive
+ #'basic.consume_ok'{consumer_tag = <<"ctag">>} ->
+ ok
+ end.
+
+cancel(Ch) ->
+ amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}).
+
+assert_delivered(Ch, Ack, P) ->
+ PBin = priority2bin(P),
+ receive
+ {#'basic.deliver'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} ->
+ PBin = PBin2,
+ maybe_ack(Ch, Ack, DTag)
+ end.
+
+get_all(Ch, Q, Ack, Ps) ->
+ DTags = get_partial(Ch, Q, Ack, Ps),
+ get_empty(Ch, Q),
+ DTags.
+
+get_partial(Ch, Q, Ack, Ps) ->
+ [get_ok(Ch, Q, Ack, P) || P <- Ps].
+
+get_empty(Ch, Q) ->
+ #'basic.get_empty'{} = amqp_channel:call(Ch, #'basic.get'{queue = Q}).
+
+get_ok(Ch, Q, Ack, P) ->
+ PBin = priority2bin(P),
+ {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = PBin2}} =
+ amqp_channel:call(Ch, #'basic.get'{queue = Q,
+ no_ack = Ack =:= no_ack}),
+ PBin = PBin2,
+ maybe_ack(Ch, Ack, DTag).
+
+get_without_ack(Ch, Q) ->
+ {#'basic.get_ok'{}, _} =
+ amqp_channel:call(Ch, #'basic.get'{queue = Q, no_ack = false}).
+
+maybe_ack(Ch, do_ack, DTag) ->
+ amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag}),
+ DTag;
+maybe_ack(_Ch, _, DTag) ->
+ DTag.
+
+arguments(none) -> [];
+arguments(Max) -> [{<<"x-max-priority">>, byte, Max}].
+
+priority2bin(undefined) -> <<"undefined">>;
+priority2bin(Int) -> list_to_binary(integer_to_list(Int)).
+
+%%----------------------------------------------------------------------------
+
+wait_for_sync(Config, Nodename, Q) ->
+ case synced(Config, Nodename, Q) of
+ true -> ok;
+ false -> timer:sleep(100),
+ wait_for_sync(Config, Nodename, Q)
+ end.
+
+synced(Config, Nodename, Q) ->
+ Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
+ rabbit_amqqueue, info_all, [<<"/">>, [name, synchronised_slave_pids]]),
+ [SSPids] = [Pids || [{name, Q1}, {synchronised_slave_pids, Pids}] <- Info,
+ Q =:= Q1],
+ length(SSPids) =:= 1.
+
+synced_msgs(Config, Nodename, Q, Expected) ->
+ Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
+ rabbit_amqqueue, info_all, [<<"/">>, [name, messages]]),
+ [M] = [M || [{name, Q1}, {messages, M}] <- Info, Q =:= Q1],
+ M =:= Expected.
+
+nodes_and_pids(SPids) ->
+ lists:zip([node(S) || S <- SPids], SPids).
+
+slave_pids(Config, Nodename, Q) ->
+ Info = rabbit_ct_broker_helpers:rpc(Config, Nodename,
+ rabbit_amqqueue, info_all, [<<"/">>, [name, slave_pids]]),
+ [SPids] = [SPids || [{name, Q1}, {slave_pids, SPids}] <- Info,
+ Q =:= Q1],
+ SPids.
+
+%%----------------------------------------------------------------------------
diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl
new file mode 100644
index 0000000000..e77f27f14b
--- /dev/null
+++ b/test/queue_master_location_SUITE.erl
@@ -0,0 +1,271 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License at
+%% http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+%% License for the specific language governing rights and limitations
+%% under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2011-2015 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(queue_master_location_SUITE).
+
+%% These tests use an ABC cluster with each node initialised with
+%% a different number of queues. When a queue is declared, different
+%% strategies can be applied to determine the queue's master node. Queue
+%% location strategies can be applied in the following ways;
+%% 1. As policy,
+%% 2. As config (in rabbitmq.config),
+%% 3. or as part of the queue's declare arguements.
+%%
+%% Currently supported strategies are;
+%% min-masters : The queue master node is calculated as the one with the
+%% least bound queues in the cluster.
+%% client-local: The queue master node is the local node from which
+%% the declaration is being carried out from
+%% random : The queue master node is randomly selected.
+%%
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(DEFAULT_VHOST_PATH, (<<"/">>)).
+-define(POLICY, <<"^qm.location$">>).
+
+all() ->
+ [
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_3, [], [
+ declare_args,
+ declare_policy,
+ declare_config,
+ calculate_min_master,
+ calculate_random,
+ calculate_client_local
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3} %% Replaced with a list of node names later.
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ Nodenames = [
+ list_to_atom(rabbit_misc:format("~s-~b", [Testcase, I]))
+ || I <- lists:seq(1, ClusterSize)
+ ],
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, Nodenames},
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps()).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+%%
+%% Queue 'declarations'
+%%
+
+declare_args(Config) ->
+ setup_test_environment(Config),
+ unset_location_config(Config),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ Args = [{<<"x-queue-master-locator">>, <<"min-masters">>}],
+ declare(Config, QueueName, false, false, Args, none),
+ verify_min_master(Config, Q).
+
+declare_policy(Config) ->
+ setup_test_environment(Config),
+ unset_location_config(Config),
+ set_location_policy(Config, ?POLICY, <<"min-masters">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ declare(Config, QueueName, false, false, _Args=[], none),
+ verify_min_master(Config, Q).
+
+declare_config(Config) ->
+ setup_test_environment(Config),
+ set_location_config(Config, <<"min-masters">>),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ declare(Config, QueueName, false, false, _Args=[], none),
+ verify_min_master(Config, Q),
+ unset_location_config(Config),
+ ok.
+
+%%
+%% Test 'calculations'
+%%
+
+calculate_min_master(Config) ->
+ setup_test_environment(Config),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ Args = [{<<"x-queue-master-locator">>, <<"min-masters">>}],
+ declare(Config, QueueName, false, false, Args, none),
+ verify_min_master(Config, Q),
+ ok.
+
+calculate_random(Config) ->
+ setup_test_environment(Config),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ Args = [{<<"x-queue-master-locator">>, <<"random">>}],
+ declare(Config, QueueName, false, false, Args, none),
+ verify_random(Config, Q),
+ ok.
+
+calculate_client_local(Config) ->
+ setup_test_environment(Config),
+ QueueName = rabbit_misc:r(<<"/">>, queue, Q= <<"qm.test">>),
+ Args = [{<<"x-queue-master-locator">>, <<"client-local">>}],
+ declare(Config, QueueName, false, false, Args, none),
+ verify_client_local(Config, Q),
+ ok.
+
+%%
+%% Setup environment
+%%
+
+setup_test_environment(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [distribute_queues(Config, Node) || Node <- Nodes],
+ ok.
+
+distribute_queues(Config, Node) ->
+ ok = rpc:call(Node, application, unset_env, [rabbit, queue_master_location]),
+ Count = case rabbit_ct_broker_helpers:nodename_to_index(Config, Node) of
+ 0 -> 15;
+ 1 -> 8;
+ 2 -> 1
+ end,
+
+ Channel = rabbit_ct_client_helpers:open_channel(Config, Node),
+ ok = declare_queues(Channel, declare_fun(), Count),
+ ok = create_e2e_binding(Channel, [<< "ex_1" >>, << "ex_2" >>]),
+ {ok, Channel}.
+
+%%
+%% Internal queue handling
+%%
+
+declare_queues(Channel, DeclareFun, 1) -> DeclareFun(Channel);
+declare_queues(Channel, DeclareFun, N) ->
+ DeclareFun(Channel),
+ declare_queues(Channel, DeclareFun, N-1).
+
+declare_exchange(Channel, Ex) ->
+ #'exchange.declare_ok'{} =
+ amqp_channel:call(Channel, #'exchange.declare'{exchange = Ex}),
+ {ok, Ex}.
+
+declare_binding(Channel, Binding) ->
+ #'exchange.bind_ok'{} = amqp_channel:call(Channel, Binding),
+ ok.
+
+declare_fun() ->
+ fun(Channel) ->
+ #'queue.declare_ok'{} = amqp_channel:call(Channel, get_random_queue_declare()),
+ ok
+ end.
+
+create_e2e_binding(Channel, ExNamesBin) ->
+ [{ok, Ex1}, {ok, Ex2}] = [declare_exchange(Channel, Ex) || Ex <- ExNamesBin],
+ Binding = #'exchange.bind'{source = Ex1, destination = Ex2},
+ ok = declare_binding(Channel, Binding).
+
+get_random_queue_declare() ->
+ #'queue.declare'{passive = false,
+ durable = false,
+ exclusive = true,
+ auto_delete = false,
+ nowait = false,
+ arguments = []}.
+
+%%
+%% Internal helper functions
+%%
+
+get_cluster() -> [node()|nodes()].
+
+min_master_node(Config) ->
+ hd(lists:reverse(
+ rabbit_ct_broker_helpers:get_node_configs(Config, nodename))).
+
+set_location_config(Config, Strategy) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [ok = rpc:call(Node, application, set_env,
+ [rabbit, queue_master_locator, Strategy]) || Node <- Nodes],
+ ok.
+
+unset_location_config(Config) ->
+ Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ [ok = rpc:call(Node, application, unset_env,
+ [rabbit, queue_master_locator]) || Node <- Nodes],
+ ok.
+
+declare(Config, QueueName, Durable, AutoDelete, Args, Owner) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {new, Queue} = rpc:call(Node, rabbit_amqqueue, declare,
+ [QueueName, Durable, AutoDelete, Args, Owner]),
+ Queue.
+
+verify_min_master(Config, Q) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ MinMaster = min_master_node(Config),
+ {ok, MinMaster} = rpc:call(Node, rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
+
+verify_random(Config, Q) ->
+ [Node | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ {ok, Master} = rpc:call(Node, rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]),
+ true = lists:member(Master, Nodes).
+
+verify_client_local(Config, Q) ->
+ Node = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ {ok, Node} = rpc:call(Node, rabbit_queue_master_location_misc,
+ lookup_master, [Q, ?DEFAULT_VHOST_PATH]).
+
+set_location_policy(Config, Name, Strategy) ->
+ ok = rabbit_ct_broker_helpers:set_policy(Config, 0,
+ Name, <<".*">>, <<"queues">>, [{<<"queue-master-locator">>, Strategy}]).
diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl
new file mode 100644
index 0000000000..f374863f6a
--- /dev/null
+++ b/test/rabbit_ha_test_consumer.erl
@@ -0,0 +1,114 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_ha_test_consumer).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-export([await_response/1, create/5, start/6]).
+
+await_response(ConsumerPid) ->
+ case receive {ConsumerPid, Response} -> Response end of
+ {error, Reason} -> erlang:error(Reason);
+ ok -> ok
+ end.
+
+create(Channel, Queue, TestPid, CancelOnFailover, ExpectingMsgs) ->
+ ConsumerPid = spawn_link(?MODULE, start,
+ [TestPid, Channel, Queue, CancelOnFailover,
+ ExpectingMsgs + 1, ExpectingMsgs]),
+ amqp_channel:subscribe(
+ Channel, consume_method(Queue, CancelOnFailover), ConsumerPid),
+ ConsumerPid.
+
+start(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
+ error_logger:info_msg("consumer ~p on ~p awaiting ~w messages "
+ "(lowest seen = ~w, cancel-on-failover = ~w)~n",
+ [self(), Channel, MsgsToConsume, LowestSeen,
+ CancelOnFailover]),
+ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume).
+
+run(TestPid, _Channel, _Queue, _CancelOnFailover, _LowestSeen, 0) ->
+ consumer_reply(TestPid, ok);
+run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
+ receive
+ #'basic.consume_ok'{} ->
+ run(TestPid, Channel, Queue,
+ CancelOnFailover, LowestSeen, MsgsToConsume);
+ {Delivery = #'basic.deliver'{ redelivered = Redelivered },
+ #amqp_msg{payload = Payload}} ->
+ MsgNum = list_to_integer(binary_to_list(Payload)),
+
+ ack(Delivery, Channel),
+
+ %% we can receive any message we've already seen and,
+ %% because of the possibility of multiple requeuings, we
+ %% might see these messages in any order. If we are seeing
+ %% a message again, we don't decrement the MsgsToConsume
+ %% counter.
+ if
+ MsgNum + 1 == LowestSeen ->
+ run(TestPid, Channel, Queue,
+ CancelOnFailover, MsgNum, MsgsToConsume - 1);
+ MsgNum >= LowestSeen ->
+ error_logger:info_msg(
+ "consumer ~p on ~p ignoring redeliverd msg ~p~n",
+ [self(), Channel, MsgNum]),
+ true = Redelivered, %% ASSERTION
+ run(TestPid, Channel, Queue,
+ CancelOnFailover, LowestSeen, MsgsToConsume);
+ true ->
+ %% We received a message we haven't seen before,
+ %% but it is not the next message in the expected
+ %% sequence.
+ consumer_reply(TestPid,
+ {error, {unexpected_message, MsgNum}})
+ end;
+ #'basic.cancel'{} when CancelOnFailover ->
+ error_logger:info_msg("consumer ~p on ~p received basic.cancel: "
+ "resubscribing to ~p on ~p~n",
+ [self(), Channel, Queue, Channel]),
+ resubscribe(TestPid, Channel, Queue, CancelOnFailover,
+ LowestSeen, MsgsToConsume);
+ #'basic.cancel'{} ->
+ exit(cancel_received_without_cancel_on_failover)
+ end.
+
+%%
+%% Private API
+%%
+
+resubscribe(TestPid, Channel, Queue, CancelOnFailover, LowestSeen,
+ MsgsToConsume) ->
+ amqp_channel:subscribe(
+ Channel, consume_method(Queue, CancelOnFailover), self()),
+ ok = receive #'basic.consume_ok'{} -> ok
+ end,
+ error_logger:info_msg("re-subscripting consumer ~p on ~p complete "
+ "(received basic.consume_ok)",
+ [self(), Channel]),
+ start(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume).
+
+consume_method(Queue, CancelOnFailover) ->
+ Args = [{<<"x-cancel-on-ha-failover">>, bool, CancelOnFailover}],
+ #'basic.consume'{queue = Queue,
+ arguments = Args}.
+
+ack(#'basic.deliver'{delivery_tag = DeliveryTag}, Channel) ->
+ amqp_channel:call(Channel, #'basic.ack'{delivery_tag = DeliveryTag}),
+ ok.
+
+consumer_reply(TestPid, Reply) ->
+ TestPid ! {self(), Reply}.
diff --git a/test/rabbit_ha_test_producer.erl b/test/rabbit_ha_test_producer.erl
new file mode 100644
index 0000000000..66dee3f7a3
--- /dev/null
+++ b/test/rabbit_ha_test_producer.erl
@@ -0,0 +1,119 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+-module(rabbit_ha_test_producer).
+
+-export([await_response/1, start/5, create/5]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+await_response(ProducerPid) ->
+ error_logger:info_msg("waiting for producer pid ~p~n", [ProducerPid]),
+ case receive {ProducerPid, Response} -> Response end of
+ ok -> ok;
+ {error, _} = Else -> exit(Else);
+ Else -> exit({weird_response, Else})
+ end.
+
+create(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+ ProducerPid = spawn_link(?MODULE, start, [Channel, Queue, TestPid,
+ Confirm, MsgsToSend]),
+ receive
+ {ProducerPid, started} -> ProducerPid
+ end.
+
+start(Channel, Queue, TestPid, Confirm, MsgsToSend) ->
+ ConfirmState =
+ case Confirm of
+ true -> amqp_channel:register_confirm_handler(Channel, self()),
+ #'confirm.select_ok'{} =
+ amqp_channel:call(Channel, #'confirm.select'{}),
+ gb_trees:empty();
+ false -> none
+ end,
+ TestPid ! {self(), started},
+ error_logger:info_msg("publishing ~w msgs on ~p~n", [MsgsToSend, Channel]),
+ producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend).
+
+%%
+%% Private API
+%%
+
+producer(_Channel, _Queue, TestPid, none, 0) ->
+ TestPid ! {self(), ok};
+producer(Channel, _Queue, TestPid, ConfirmState, 0) ->
+ error_logger:info_msg("awaiting confirms on channel ~p~n", [Channel]),
+ Msg = case drain_confirms(no_nacks, ConfirmState) of
+ no_nacks -> ok;
+ nacks -> {error, received_nacks};
+ {Nacks, CS} -> {error, {missing_confirms, Nacks,
+ lists:sort(gb_trees:keys(CS))}}
+ end,
+ TestPid ! {self(), Msg};
+
+producer(Channel, Queue, TestPid, ConfirmState, MsgsToSend) ->
+ Method = #'basic.publish'{exchange = <<"">>,
+ routing_key = Queue,
+ mandatory = false,
+ immediate = false},
+
+ ConfirmState1 = maybe_record_confirm(ConfirmState, Channel, MsgsToSend),
+
+ amqp_channel:call(Channel, Method,
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2},
+ payload = list_to_binary(
+ integer_to_list(MsgsToSend))}),
+
+ producer(Channel, Queue, TestPid, ConfirmState1, MsgsToSend - 1).
+
+maybe_record_confirm(none, _, _) ->
+ none;
+maybe_record_confirm(ConfirmState, Channel, MsgsToSend) ->
+ SeqNo = amqp_channel:next_publish_seqno(Channel),
+ gb_trees:insert(SeqNo, MsgsToSend, ConfirmState).
+
+drain_confirms(Nacks, ConfirmState) ->
+ case gb_trees:is_empty(ConfirmState) of
+ true -> Nacks;
+ false -> receive
+ #'basic.ack'{delivery_tag = DeliveryTag,
+ multiple = IsMulti} ->
+ drain_confirms(Nacks,
+ delete_confirms(DeliveryTag, IsMulti,
+ ConfirmState));
+ #'basic.nack'{delivery_tag = DeliveryTag,
+ multiple = IsMulti} ->
+ drain_confirms(nacks,
+ delete_confirms(DeliveryTag, IsMulti,
+ ConfirmState))
+ after
+ 60000 -> {Nacks, ConfirmState}
+ end
+ end.
+
+delete_confirms(DeliveryTag, false, ConfirmState) ->
+ gb_trees:delete(DeliveryTag, ConfirmState);
+delete_confirms(DeliveryTag, true, ConfirmState) ->
+ multi_confirm(DeliveryTag, ConfirmState).
+
+multi_confirm(DeliveryTag, ConfirmState) ->
+ case gb_trees:is_empty(ConfirmState) of
+ true -> ConfirmState;
+ false -> {Key, _, ConfirmState1} = gb_trees:take_smallest(ConfirmState),
+ case Key =< DeliveryTag of
+ true -> multi_confirm(DeliveryTag, ConfirmState1);
+ false -> ConfirmState
+ end
+ end.
diff --git a/test/simple_ha_SUITE.erl b/test/simple_ha_SUITE.erl
new file mode 100644
index 0000000000..af85ad6d3b
--- /dev/null
+++ b/test/simple_ha_SUITE.erl
@@ -0,0 +1,216 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(simple_ha_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+all() ->
+ [
+ {group, cluster_size_2},
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ rapid_redeclare,
+ declare_synchrony
+ ]},
+ {cluster_size_3, [], [
+ consume_survives_stop,
+ consume_survives_sigkill,
+ consume_survives_policy,
+ auto_resume,
+ auto_resume_no_ccn_client,
+ confirms_survive_stop,
+ confirms_survive_sigkill,
+ confirms_survive_policy
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 2}
+ ]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, 3}
+ ]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_all/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+rapid_redeclare(Config) ->
+ A = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename),
+ Ch = rabbit_ct_client_helpers:open_channel(Config, A),
+ Queue = <<"test">>,
+ [begin
+ amqp_channel:call(Ch, #'queue.declare'{queue = Queue,
+ durable = true}),
+ amqp_channel:call(Ch, #'queue.delete'{queue = Queue})
+ end || _I <- lists:seq(1, 20)],
+ ok.
+
+%% Check that by the time we get a declare-ok back, the slaves are up
+%% and in Mnesia.
+declare_synchrony(Config) ->
+ [Rabbit, Hare] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ RabbitCh = rabbit_ct_client_helpers:open_channel(Config, Rabbit),
+ HareCh = rabbit_ct_client_helpers:open_channel(Config, Hare),
+ Q = <<"mirrored-queue">>,
+ declare(RabbitCh, Q),
+ amqp_channel:call(RabbitCh, #'confirm.select'{}),
+ amqp_channel:cast(RabbitCh, #'basic.publish'{routing_key = Q},
+ #amqp_msg{props = #'P_basic'{delivery_mode = 2}}),
+ amqp_channel:wait_for_confirms(RabbitCh),
+ rabbit_ct_broker_helpers:kill_node(Config, Rabbit),
+
+ #'queue.declare_ok'{message_count = 1} = declare(HareCh, Q),
+ ok.
+
+declare(Ch, Name) ->
+ amqp_channel:call(Ch, #'queue.declare'{durable = true, queue = Name}).
+
+consume_survives_stop(Cf) -> consume_survives(Cf, fun stop/2, true).
+consume_survives_sigkill(Cf) -> consume_survives(Cf, fun sigkill/2, true).
+consume_survives_policy(Cf) -> consume_survives(Cf, fun policy/2, true).
+auto_resume(Cf) -> consume_survives(Cf, fun sigkill/2, false).
+auto_resume_no_ccn_client(Cf) -> consume_survives(Cf, fun sigkill/2, false,
+ false).
+
+confirms_survive_stop(Cf) -> confirms_survive(Cf, fun stop/2).
+confirms_survive_sigkill(Cf) -> confirms_survive(Cf, fun sigkill/2).
+confirms_survive_policy(Cf) -> confirms_survive(Cf, fun policy/2).
+
+%%----------------------------------------------------------------------------
+
+consume_survives(Config, DeathFun, CancelOnFailover) ->
+ consume_survives(Config, DeathFun, CancelOnFailover, true).
+
+consume_survives(Config,
+ DeathFun, CancelOnFailover, CCNSupported) ->
+ [A, B, C] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000),
+ Channel1 = rabbit_ct_client_helpers:open_channel(Config, A),
+ Channel2 = rabbit_ct_client_helpers:open_channel(Config, B),
+ Channel3 = rabbit_ct_client_helpers:open_channel(Config, C),
+
+ %% declare the queue on the master, mirrored to the two slaves
+ Queue = <<"test">>,
+ amqp_channel:call(Channel1, #'queue.declare'{queue = Queue,
+ auto_delete = false}),
+
+ %% start up a consumer
+ ConsCh = case CCNSupported of
+ true -> Channel2;
+ false -> Port = rabbit_ct_broker_helpers:get_node_config(
+ Config, B, tcp_port_amqp),
+ open_incapable_channel(Port)
+ end,
+ ConsumerPid = rabbit_ha_test_consumer:create(
+ ConsCh, Queue, self(), CancelOnFailover, Msgs),
+
+ %% send a bunch of messages from the producer
+ ProducerPid = rabbit_ha_test_producer:create(Channel3, Queue,
+ self(), false, Msgs),
+ DeathFun(Config, A),
+ %% verify that the consumer got all msgs, or die - the await_response
+ %% calls throw an exception if anything goes wrong....
+ rabbit_ha_test_consumer:await_response(ConsumerPid),
+ rabbit_ha_test_producer:await_response(ProducerPid),
+ ok.
+
+confirms_survive(Config, DeathFun) ->
+ [A, B, _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ Msgs = rabbit_ct_helpers:cover_work_factor(Config, 20000),
+ Node1Channel = rabbit_ct_client_helpers:open_channel(Config, A),
+ Node2Channel = rabbit_ct_client_helpers:open_channel(Config, B),
+
+ %% declare the queue on the master, mirrored to the two slaves
+ Queue = <<"test">>,
+ amqp_channel:call(Node1Channel,#'queue.declare'{queue = Queue,
+ auto_delete = false,
+ durable = true}),
+
+ %% send a bunch of messages from the producer
+ ProducerPid = rabbit_ha_test_producer:create(Node2Channel, Queue,
+ self(), true, Msgs),
+ DeathFun(Config, A),
+ rabbit_ha_test_producer:await_response(ProducerPid),
+ ok.
+
+stop(Config, Node) ->
+ rabbit_ct_broker_helpers:stop_node_after(Config, Node, 50).
+
+sigkill(Config, Node) ->
+ rabbit_ct_broker_helpers:kill_node_after(Config, Node, 50).
+
+policy(Config, Node)->
+ Nodes = [
+ rabbit_misc:atom_to_binary(N)
+ || N <- rabbit_ct_broker_helpers:get_node_configs(Config, nodename),
+ N =/= Node],
+ rabbit_ct_broker_helpers:set_ha_policy(Config, Node, <<".*">>,
+ {<<"nodes">>, Nodes}).
+
+open_incapable_channel(NodePort) ->
+ Props = [{<<"capabilities">>, table, []}],
+ {ok, ConsConn} =
+ amqp_connection:start(#amqp_params_network{port = NodePort,
+ client_properties = Props}),
+ {ok, Ch} = amqp_connection:open_channel(ConsConn),
+ Ch.
diff --git a/test/sync_detection_SUITE.erl b/test/sync_detection_SUITE.erl
new file mode 100644
index 0000000000..1e0a66e8fd
--- /dev/null
+++ b/test/sync_detection_SUITE.erl
@@ -0,0 +1,252 @@
+%% The contents of this file are subject to the Mozilla Public License
+%% Version 1.1 (the "License"); you may not use this file except in
+%% compliance with the License. You may obtain a copy of the License
+%% at http://www.mozilla.org/MPL/
+%%
+%% Software distributed under the License is distributed on an "AS IS"
+%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See
+%% the License for the specific language governing rights and
+%% limitations under the License.
+%%
+%% The Original Code is RabbitMQ.
+%%
+%% The Initial Developer of the Original Code is GoPivotal, Inc.
+%% Copyright (c) 2007-2016 Pivotal Software, Inc. All rights reserved.
+%%
+
+-module(sync_detection_SUITE).
+
+-include_lib("common_test/include/ct.hrl").
+-include_lib("amqp_client/include/amqp_client.hrl").
+
+-compile(export_all).
+
+-define(LOOP_RECURSION_DELAY, 100).
+
+all() ->
+ [
+ {group, cluster_size_2},
+ {group, cluster_size_3}
+ ].
+
+groups() ->
+ [
+ {cluster_size_2, [], [
+ slave_synchronization
+ ]},
+ {cluster_size_3, [], [
+ slave_synchronization_ttl
+ ]}
+ ].
+
+%% -------------------------------------------------------------------
+%% Testsuite setup/teardown.
+%% -------------------------------------------------------------------
+
+init_per_suite(Config) ->
+ rabbit_ct_helpers:log_environment(),
+ rabbit_ct_helpers:run_setup_steps(Config).
+
+end_per_suite(Config) ->
+ rabbit_ct_helpers:run_teardown_steps(Config).
+
+init_per_group(cluster_size_2, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 2}]);
+init_per_group(cluster_size_3, Config) ->
+ rabbit_ct_helpers:set_config(Config, [{rmq_nodes_count, 3}]).
+
+end_per_group(_, Config) ->
+ Config.
+
+init_per_testcase(Testcase, Config) ->
+ rabbit_ct_helpers:testcase_started(Config, Testcase),
+ ClusterSize = ?config(rmq_nodes_count, Config),
+ TestNumber = rabbit_ct_helpers:testcase_number(Config, ?MODULE, Testcase),
+ Config1 = rabbit_ct_helpers:set_config(Config, [
+ {rmq_nodes_count, ClusterSize},
+ {rmq_nodes_clustered, true},
+ {rmq_nodename_suffix, Testcase},
+ {tcp_ports_base, {skip_n_nodes, TestNumber * ClusterSize}}
+ ]),
+ rabbit_ct_helpers:run_steps(Config1,
+ rabbit_ct_broker_helpers:setup_steps() ++
+ rabbit_ct_client_helpers:setup_steps() ++ [
+ fun rabbit_ct_broker_helpers:set_ha_policy_two_pos/1,
+ fun rabbit_ct_broker_helpers:set_ha_policy_two_pos_batch_sync/1
+ ]).
+
+end_per_testcase(Testcase, Config) ->
+ Config1 = rabbit_ct_helpers:run_steps(Config,
+ rabbit_ct_client_helpers:teardown_steps() ++
+ rabbit_ct_broker_helpers:teardown_steps()),
+ rabbit_ct_helpers:testcase_finished(Config1, Testcase).
+
+%% -------------------------------------------------------------------
+%% Testcases.
+%% -------------------------------------------------------------------
+
+slave_synchronization(Config) ->
+ [Master, Slave] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ Channel = rabbit_ct_client_helpers:open_channel(Config, Master),
+ Queue = <<"ha.two.test">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
+ auto_delete = false}),
+
+ %% The comments on the right are the queue length and the pending acks on
+ %% the master.
+ rabbit_ct_broker_helpers:stop_broker(Config, Slave),
+
+ %% We get and ack one message when the slave is down, and check that when we
+ %% start the slave it's not marked as synced until ack the message. We also
+ %% publish another message when the slave is up.
+ send_dummy_message(Channel, Queue), % 1 - 0
+ {#'basic.get_ok'{delivery_tag = Tag1}, _} =
+ amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 0 - 1
+
+ rabbit_ct_broker_helpers:start_broker(Config, Slave),
+
+ slave_unsynced(Master, Queue),
+ send_dummy_message(Channel, Queue), % 1 - 1
+ slave_unsynced(Master, Queue),
+
+ amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag1}), % 1 - 0
+
+ slave_synced(Master, Queue),
+
+ %% We restart the slave and we send a message, so that the slave will only
+ %% have one of the messages.
+ rabbit_ct_broker_helpers:stop_broker(Config, Slave),
+ rabbit_ct_broker_helpers:start_broker(Config, Slave),
+
+ send_dummy_message(Channel, Queue), % 2 - 0
+
+ slave_unsynced(Master, Queue),
+
+ %% We reject the message that the slave doesn't have, and verify that it's
+ %% still unsynced
+ {#'basic.get_ok'{delivery_tag = Tag2}, _} =
+ amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 1 - 1
+ slave_unsynced(Master, Queue),
+ amqp_channel:cast(Channel, #'basic.reject'{ delivery_tag = Tag2,
+ requeue = true }), % 2 - 0
+ slave_unsynced(Master, Queue),
+ {#'basic.get_ok'{delivery_tag = Tag3}, _} =
+ amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 1 - 1
+ amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag3}), % 1 - 0
+ slave_synced(Master, Queue),
+ {#'basic.get_ok'{delivery_tag = Tag4}, _} =
+ amqp_channel:call(Channel, #'basic.get'{queue = Queue}), % 0 - 1
+ amqp_channel:cast(Channel, #'basic.ack'{delivery_tag = Tag4}), % 0 - 0
+ slave_synced(Master, Queue).
+
+slave_synchronization_ttl(Config) ->
+ [Master, Slave, DLX] = rabbit_ct_broker_helpers:get_node_configs(Config,
+ nodename),
+ Channel = rabbit_ct_client_helpers:open_channel(Config, Master),
+ DLXChannel = rabbit_ct_client_helpers:open_channel(Config, DLX),
+
+ %% We declare a DLX queue to wait for messages to be TTL'ed
+ DLXQueue = <<"dlx-queue">>,
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = DLXQueue,
+ auto_delete = false}),
+
+ TestMsgTTL = 5000,
+ Queue = <<"ha.two.test">>,
+ %% Sadly we need fairly high numbers for the TTL because starting/stopping
+ %% nodes takes a fair amount of time.
+ Args = [{<<"x-message-ttl">>, long, TestMsgTTL},
+ {<<"x-dead-letter-exchange">>, longstr, <<>>},
+ {<<"x-dead-letter-routing-key">>, longstr, DLXQueue}],
+ #'queue.declare_ok'{} =
+ amqp_channel:call(Channel, #'queue.declare'{queue = Queue,
+ auto_delete = false,
+ arguments = Args}),
+
+ slave_synced(Master, Queue),
+
+ %% All unknown
+ rabbit_ct_broker_helpers:stop_broker(Config, Slave),
+ send_dummy_message(Channel, Queue),
+ send_dummy_message(Channel, Queue),
+ rabbit_ct_broker_helpers:start_broker(Config, Slave),
+ slave_unsynced(Master, Queue),
+ wait_for_messages(DLXQueue, DLXChannel, 2),
+ slave_synced(Master, Queue),
+
+ %% 1 unknown, 1 known
+ rabbit_ct_broker_helpers:stop_broker(Config, Slave),
+ send_dummy_message(Channel, Queue),
+ rabbit_ct_broker_helpers:start_broker(Config, Slave),
+ slave_unsynced(Master, Queue),
+ send_dummy_message(Channel, Queue),
+ slave_unsynced(Master, Queue),
+ wait_for_messages(DLXQueue, DLXChannel, 2),
+ slave_synced(Master, Queue),
+
+ %% %% both known
+ send_dummy_message(Channel, Queue),
+ send_dummy_message(Channel, Queue),
+ slave_synced(Master, Queue),
+ wait_for_messages(DLXQueue, DLXChannel, 2),
+ slave_synced(Master, Queue),
+
+ ok.
+
+send_dummy_message(Channel, Queue) ->
+ Payload = <<"foo">>,
+ Publish = #'basic.publish'{exchange = <<>>, routing_key = Queue},
+ amqp_channel:cast(Channel, Publish, #amqp_msg{payload = Payload}).
+
+slave_pids(Node, Queue) ->
+ {ok, Q} = rpc:call(Node, rabbit_amqqueue, lookup,
+ [rabbit_misc:r(<<"/">>, queue, Queue)]),
+ SSP = synchronised_slave_pids,
+ [{SSP, Pids}] = rpc:call(Node, rabbit_amqqueue, info, [Q, [SSP]]),
+ case Pids of
+ '' -> [];
+ _ -> Pids
+ end.
+
+%% The mnesia syncronization takes a while, but we don't want to wait for the
+%% test to fail, since the timetrap is quite high.
+wait_for_sync_status(Status, Node, Queue) ->
+ Max = 10000 / ?LOOP_RECURSION_DELAY,
+ wait_for_sync_status(0, Max, Status, Node, Queue).
+
+wait_for_sync_status(N, Max, Status, Node, Queue) when N >= Max ->
+ erlang:error({sync_status_max_tries_failed,
+ [{queue, Queue},
+ {node, Node},
+ {expected_status, Status},
+ {max_tried, Max}]});
+wait_for_sync_status(N, Max, Status, Node, Queue) ->
+ Synced = length(slave_pids(Node, Queue)) =:= 1,
+ case Synced =:= Status of
+ true -> ok;
+ false -> timer:sleep(?LOOP_RECURSION_DELAY),
+ wait_for_sync_status(N + 1, Max, Status, Node, Queue)
+ end.
+
+slave_synced(Node, Queue) ->
+ wait_for_sync_status(true, Node, Queue).
+
+slave_unsynced(Node, Queue) ->
+ wait_for_sync_status(false, Node, Queue).
+
+wait_for_messages(Queue, Channel, N) ->
+ Sub = #'basic.consume'{queue = Queue},
+ #'basic.consume_ok'{consumer_tag = CTag} = amqp_channel:call(Channel, Sub),
+ receive
+ #'basic.consume_ok'{} -> ok
+ end,
+ lists:foreach(
+ fun (_) -> receive
+ {#'basic.deliver'{delivery_tag = Tag}, _Content} ->
+ amqp_channel:cast(Channel,
+ #'basic.ack'{delivery_tag = Tag})
+ end
+ end, lists:seq(1, N)),
+ amqp_channel:call(Channel, #'basic.cancel'{consumer_tag = CTag}).