diff options
| author | Michael Klishin <michael@clojurewerkz.org> | 2019-02-12 23:28:53 +0300 |
|---|---|---|
| committer | Michael Klishin <michael@clojurewerkz.org> | 2019-02-12 23:28:53 +0300 |
| commit | 3efa9d81735ddbeb2e355699d01be02fd1ee7d06 (patch) | |
| tree | 7951c16b8495a1fa1a741e93e7858baa437b8dd2 | |
| parent | 19665bb3fd8e7c257703633f5550b35fb0775199 (diff) | |
| parent | cea04c52dcdcda16ecca5d471bdb7cc6ec16beb8 (diff) | |
| download | rabbitmq-server-git-3efa9d81735ddbeb2e355699d01be02fd1ee7d06.tar.gz | |
Merge branch 'master' into fix-more-dialyzer
47 files changed, 871 insertions, 740 deletions
diff --git a/docs/rabbitmq-service.8 b/docs/rabbitmq-service.8 index f9c5588435..ec02d98e15 100644 --- a/docs/rabbitmq-service.8 +++ b/docs/rabbitmq-service.8 @@ -115,7 +115,7 @@ Defaults to for 64-bit environments). This is the installation location of the Erlang service manager. .It Ev RABBITMQ_CONSOLE_LOG -Set this varable to +Set this variable to .Sy new or .Sy reuse to have the console output from the server redirected to a file named diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index 42aea4acf5..81958c89fd 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -804,7 +804,7 @@ ## See http://rabbitmq.com/logging.html and https://github.com/erlang-lager/lager for details. ## -## Log direcrory, taken from the RABBITMQ_LOG_BASE env variable by default. +## Log directory, taken from the RABBITMQ_LOG_BASE env variable by default. ## # log.dir = /var/log/rabbitmq diff --git a/docs/rabbitmq.config.example b/docs/rabbitmq.config.example index 9ee9a352be..eba4921c84 100644 --- a/docs/rabbitmq.config.example +++ b/docs/rabbitmq.config.example @@ -339,7 +339,7 @@ %% Enables flow control between queue mirrors. %% Disabling this can be dangerous and is not recommended. - %% When flow control is disablied, queue masters can outpace mirrors and not allow mirrors to catch up. + %% When flow control is disabled, queue masters can outpace mirrors and not allow mirrors to catch up. %% Mirrors will end up using increasingly more RAM, eventually triggering a memory alarm. %% %% {mirroring_flow_control, true}, @@ -909,7 +909,7 @@ %% {rabbit_channel_lager_event, [{handlers, [ %% {lager_forwarder_backend, %% [lager_event, info]}]}]}, - %% {rabbit_conection_lager_event, [{handlers, [ + %% {rabbit_connection_lager_event, [{handlers, [ %% {lager_forwarder_backend, %% [lager_event, info]}]}]}, %% {rabbit_mirroring_lager_event, [{handlers, [ diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index 6c08655acd..a087e80e85 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -156,7 +156,7 @@ To use this precompiled files, you should set .Ev RABBITMQ_SERVER_CODE_PATH environment variable to directory specified in .Cm hipe_compile -invokation. +invocation. .Pp For example, to HiPE-compile modules and store them to .Pa /tmp/rabbit-hipe/ebin @@ -255,7 +255,7 @@ For example, to instruct the RabbitMQ node to terminate: .Dl rabbitmqctl stop .\" ------------------------------------ .It Cm stop_app -Stops the RabbitMQ application, leaving the runtme (Erlang VM) running. +Stops the RabbitMQ application, leaving the runtime (Erlang VM) running. .Pp This command is typically run prior to performing other management actions that require the RabbitMQ application to be stopped, e.g.\& @@ -596,7 +596,7 @@ pkg_check_node_commit = master PACKAGES += chronos pkg_chronos_name = chronos -pkg_chronos_description = Timer module for Erlang that makes it easy to abstact time out of the tests. +pkg_chronos_description = Timer module for Erlang that makes it easy to abstract time out of the tests. pkg_chronos_homepage = https://github.com/lehoff/chronos pkg_chronos_fetch = git pkg_chronos_repo = https://github.com/lehoff/chronos @@ -1340,7 +1340,7 @@ pkg_erlang_cep_commit = master PACKAGES += erlang_js pkg_erlang_js_name = erlang_js -pkg_erlang_js_description = A linked-in driver for Erlang to Mozilla's Spidermonkey Javascript runtime. +pkg_erlang_js_description = A linked-in driver for Erlang to Mozilla's Spidermonkey JavaScript runtime. pkg_erlang_js_homepage = https://github.com/basho/erlang_js pkg_erlang_js_fetch = git pkg_erlang_js_repo = https://github.com/basho/erlang_js @@ -4132,7 +4132,7 @@ pkg_yaws_commit = master PACKAGES += zab_engine pkg_zab_engine_name = zab_engine -pkg_zab_engine_description = zab propotocol implement by erlang +pkg_zab_engine_description = zab protocol implement by erlang pkg_zab_engine_homepage = https://github.com/xinmingyao/zab_engine pkg_zab_engine_fetch = git pkg_zab_engine_repo = https://github.com/xinmingyao/zab_engine diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index f24e74c9ef..2617f558f7 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -903,7 +903,7 @@ fun(Conf) -> end end}. -%% Classic config-driven peer discuvery backend. +%% Classic config-driven peer discovery backend. %% %% Make clustering happen *automatically* at startup - only applied %% to nodes that have just been reset or started for the first time. @@ -1530,7 +1530,7 @@ fun(Size) when is_integer(Size) -> Size > 0 andalso Size < 536870912 end}. -{validator, "less_than_1", "Flooat is not beetween 0 and 1", +{validator, "less_than_1", "Float is not between 0 and 1", fun(Float) when is_float(Float) -> Float > 0 andalso Float < 1 end}. diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index 0cc6bd544a..a72365dc47 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -226,7 +226,7 @@ export base_rmq_ref # If cloning from this computed location fails, we fallback to RabbitMQ # upstream which is GitHub. -# Maccro to transform eg. "rabbit_common" to "rabbitmq-common". +# Macro to transform eg. "rabbit_common" to "rabbitmq-common". rmq_cmp_repo_name = $(word 2,$(dep_$(1))) # Upstream URL for the current project. diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index aeac38e46f..b4863057f0 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -219,7 +219,7 @@ fi # The defaults are meant to reduce RabbitMQ's memory usage and help # it reclaim memory at the cost of a slight decrease in performance # (due to an increase in memory operations). These defaults can be -# overriden using the RABBITMQ_SERVER_ERL_ARGS variable. +# overridden using the RABBITMQ_SERVER_ERL_ARGS variable. RABBITMQ_DEFAULT_ALLOC_ARGS="+MBas ageffcbf +MHas ageffcbf +MBlmbcs 512 +MHlmbcs 512 +MMmcs 30" ${ERL_DIR}erl ${RABBITMQ_DEFAULT_ALLOC_ARGS} \ @@ -374,7 +374,7 @@ else # This is considered an abnormal process termination. Normally, we # don't need to specify this exit code because the shell propagates it. # Unfortunately, the signal handler doesn't work as expected in Dash, - # thus we need to explicitely restate the exit code. + # thus we need to explicitly restate the exit code. trap "stop_rabbitmq_server; exit 0" HUP TERM TSTP trap "stop_rabbitmq_server; exit 130" INT diff --git a/scripts/rabbitmq-server.bat b/scripts/rabbitmq-server.bat index d417091732..462db4c10c 100644 --- a/scripts/rabbitmq-server.bat +++ b/scripts/rabbitmq-server.bat @@ -167,7 +167,7 @@ rem rem The defaults are meant to reduce RabbitMQ's memory usage and help
rem it reclaim memory at the cost of a slight decrease in performance
rem (due to an increase in memory operations). These defaults can be
-rem overriden using the RABBITMQ_SERVER_ERL_ARGS variable.
+rem overridden using the RABBITMQ_SERVER_ERL_ARGS variable.
set RABBITMQ_DEFAULT_ALLOC_ARGS=+MBas ageffcbf +MHas ageffcbf +MBlmbcs 512 +MHlmbcs 512 +MMmcs 30
diff --git a/scripts/rabbitmq-service.bat b/scripts/rabbitmq-service.bat index 7bb1f124b5..e3057f16b5 100644 --- a/scripts/rabbitmq-service.bat +++ b/scripts/rabbitmq-service.bat @@ -254,7 +254,7 @@ rem rem The defaults are meant to reduce RabbitMQ's memory usage and help
rem it reclaim memory at the cost of a slight decrease in performance
rem (due to an increase in memory operations). These defaults can be
-rem overriden using the RABBITMQ_SERVER_ERL_ARGS variable.
+rem overridden using the RABBITMQ_SERVER_ERL_ARGS variable.
set RABBITMQ_DEFAULT_ALLOC_ARGS=+MBas ageffcbf +MHas ageffcbf +MBlmbcs 512 +MHlmbcs 512 +MMmcs 30
diff --git a/src/gm.erl b/src/gm.erl index 02ee76cd60..7b9bbc8c8a 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -1271,7 +1271,7 @@ neighbour_cast(N, Msg) -> ?INSTR_MOD:cast(get_pid(N), Msg). neighbour_call(N, Msg) -> ?INSTR_MOD:call(get_pid(N), Msg, infinity). %% --------------------------------------------------------------------------- -%% View monitoring and maintanence +%% View monitoring and maintenance %% --------------------------------------------------------------------------- ensure_neighbour(_Ver, Self, {Self, undefined}, Self) -> diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index b467e6c02e..e1978a9173 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1195,7 +1195,7 @@ prioritise_cast(Msg, _Len, State) -> %% will be rate limited by how fast consumers receive messages - %% i.e. by notify_sent. We prioritise ack and resume to discourage %% starvation caused by prioritising notify_sent. We don't vary their -%% prioritiy since acks should stay in order (some parts of the queue +%% priority since acks should stay in order (some parts of the queue %% stack are optimised for that) and to make things easier to reason %% about. Finally, we prioritise ack over resume since it should %% always reduce memory use. diff --git a/src/rabbit_backing_queue.erl b/src/rabbit_backing_queue.erl index ef7a6a2337..187f55288d 100644 --- a/src/rabbit_backing_queue.erl +++ b/src/rabbit_backing_queue.erl @@ -158,7 +158,7 @@ %% Drop messages from the head of the queue while the supplied %% predicate on message properties returns true. Returns the first -%% message properties for which the predictate returned false, or +%% message properties for which the predicate returned false, or %% 'undefined' if the whole backing queue was traversed w/o the %% predicate ever returning false. -callback dropwhile(msg_pred(), state()) diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index d5dd236937..036aa9a60c 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1453,7 +1453,7 @@ handle_method(#'basic.qos'{prefetch_size = Size}, _, _State) when Size /= 0 -> handle_method(#'basic.qos'{global = false, prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter}) -> - %% Ensures that if default was set, it's overriden + %% Ensures that if default was set, it's overridden Limiter1 = rabbit_limiter:unlimit_prefetch(Limiter), {reply, #'basic.qos_ok'{}, State#ch{consumer_prefetch = PrefetchCount, limiter = Limiter1}}; diff --git a/src/rabbit_feature_flags.erl b/src/rabbit_feature_flags.erl index 8ba38179a5..74f5a34db0 100644 --- a/src/rabbit_feature_flags.erl +++ b/src/rabbit_feature_flags.erl @@ -1411,7 +1411,7 @@ does_node_support(Node, FeatureNames, Timeout) -> %% If rabbit_feature_flags:is_supported_locally/1 is undefined %% on the remote node, we consider it to be a 3.7.x node. %% - %% Theoritically, it could be an older version (3.6.x and + %% Theoretically, it could be an older version (3.6.x and %% older). But the RabbitMQ version consistency check %% (rabbit_misc:version_minor_equivalent/2) called from %% rabbit_mnesia:check_rabbit_consistency/2 already blocked diff --git a/src/rabbit_ff_registry.erl b/src/rabbit_ff_registry.erl index 2b2614bf39..46d439001f 100644 --- a/src/rabbit_ff_registry.erl +++ b/src/rabbit_ff_registry.erl @@ -148,7 +148,7 @@ is_registry_written_to_disk() -> always_return_true() -> %% This function is here to trick Dialyzer. We want some functions %% in this initial on-disk registry to always return `true` or - %% `false`. However the generated regsitry will return actual + %% `false`. However the generated registry will return actual %% booleans. The `-spec()` correctly advertises a return type of %% `boolean()`. But in the meantime, Dialyzer only knows about this %% copy which, without the trick below, would always return either diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 1fe0b38cd9..ee0f65950c 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -156,7 +156,7 @@ #update_config{}. -type command() :: protocol() | ra_machine:builtin_command(). -%% all the command types suppored by ra fifo +%% all the command types supported by ra fifo -type client_msg() :: delivery(). %% the messages `rabbit_fifo' can send to consumers. @@ -184,7 +184,7 @@ %% command: `{consumer_credit, ReceiverDeliveryCount, Credit}' credit_mode = simple_prefetch :: credit_mode(), % part of snapshot data lifetime = once :: once | auto, - suspected_down = false :: boolean() + status = up :: up | suspected_down | cancelled }). -type consumer() :: #consumer{}. @@ -193,7 +193,7 @@ {next_seqno = 1 :: msg_seqno(), % out of order enqueues - sorted list pending = [] :: [{msg_seqno(), ra_index(), raw_msg()}], - suspected_down = false :: boolean() + status = up :: up | suspected_down }). -record(state, @@ -323,7 +323,7 @@ apply(Meta, case Cons0 of #{ConsumerId := Con0} -> % need to increment metrics before completing as any snapshot - % states taken need to includ them + % states taken need to include them complete_and_checkout(Meta, MsgIds, ConsumerId, Con0, [], State); _ -> @@ -426,10 +426,7 @@ apply(Meta, #checkout{spec = {dequeue, Settlement}, end end; apply(Meta, #checkout{spec = cancel, consumer_id = ConsumerId}, State0) -> - {State, Effects} = cancel_consumer(ConsumerId, State0, []), - % TODO: here we should really demonitor the pid but _only_ if it has no - % other consumers or enqueuers. leaving a monitor in place isn't harmful - % however + {State, Effects} = cancel_consumer(ConsumerId, State0, [], consumer_cancel), checkout(Meta, State, Effects); apply(Meta, #checkout{spec = Spec, meta = ConsumerMeta, consumer_id = {_, Pid} = ConsumerId}, @@ -466,26 +463,30 @@ apply(_, {down, ConsumerPid, noconnection}, % mark all consumers and enqueuers as suspected down % and monitor the node so that we can find out the final state of the % process at some later point - {Cons, State, Effects1} = maps:fold( - fun({_, P} = K, - #consumer{checked_out = Checked0} = C, - {Co, St0, Eff}) when node(P) =:= Node -> - St = return_all(St0, Checked0), - Credit = increase_credit(C, maps:size(Checked0)), - Eff1 = ConsumerUpdateActiveFun(St, K, C, false, suspected_down, Eff), - {maps:put(K, C#consumer{suspected_down = true, - credit = Credit, - checked_out = #{}}, Co), - St, Eff1}; - (K, C, {Co, St, Eff}) -> - {maps:put(K, C, Co), St, Eff} - end, {#{}, State0, []}, Cons0), + {Cons, State, Effects1} = + maps:fold(fun({_, P} = K, + #consumer{checked_out = Checked0} = C, + {Co, St0, Eff}) when (node(P) =:= Node) and + (C#consumer.status =/= cancelled)-> + St = return_all(St0, Checked0), + Credit = increase_credit(C, maps:size(Checked0)), + Eff1 = ConsumerUpdateActiveFun(St, K, C, false, + suspected_down, Eff), + {maps:put(K, + C#consumer{status = suspected_down, + credit = Credit, + checked_out = #{}}, Co), + St, Eff1}; + (K, C, {Co, St, Eff}) -> + {maps:put(K, C, Co), St, Eff} + end, {#{}, State0, []}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = true}; + E#enqueuer{status = suspected_down}; (_, E) -> E end, Enqs0), % mark waiting consumers as suspected if necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, true), + WaitingConsumers = update_waiting_consumer_status(Node, State0, + suspected_down), Effects2 = case maps:size(Cons) of 0 -> @@ -514,12 +515,12 @@ apply(Meta, {down, Pid, _Info}, #state{consumers = Cons0, DownConsumers = maps:keys( maps:filter(fun({_, P}, _) -> P =:= Pid end, Cons0)), {State, Effects} = lists:foldl(fun(ConsumerId, {S, E}) -> - cancel_consumer(ConsumerId, S, E) + cancel_consumer(ConsumerId, S, E, down) end, {State2, Effects1}, DownConsumers), checkout(Meta, State, Effects); apply(Meta, {nodeup, Node}, #state{consumers = Cons0, - enqueuers = Enqs0, - service_queue = SQ0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -528,20 +529,20 @@ apply(Meta, {nodeup, Node}, #state{consumers = Cons0, || P <- suspected_pids_for(Node, State0)], % un-suspect waiting consumers when necessary - WaitingConsumers = maybe_mark_suspect_waiting_consumers(Node, State0, - false), + WaitingConsumers = update_waiting_consumer_status(Node, State0, up), Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> - E#enqueuer{suspected_down = false}; + E#enqueuer{status = up}; (_, E) -> E end, Enqs0), ConsumerUpdateActiveFun = consumer_active_flag_update_function(State0), {Cons1, SQ, Effects} = maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) - when node(P) =:= Node -> + when (node(P) =:= Node) and + (C#consumer.status =/= cancelled) -> EAcc1 = ConsumerUpdateActiveFun(State0, ConsumerId, C, true, up, EAcc), update_or_remove_sub( - ConsumerId, C#consumer{suspected_down = false}, + ConsumerId, C#consumer{status = up}, CAcc, SQAcc, EAcc1); (_, _, Acc) -> Acc @@ -587,26 +588,27 @@ handle_waiting_consumer_down(Pid, State = State0#state{waiting_consumers = StillUp}, {Effects, State}. -maybe_mark_suspect_waiting_consumers(_Node, #state{consumer_strategy = default}, - _Suspected) -> +update_waiting_consumer_status(_Node, #state{consumer_strategy = default}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(_Node, - #state{consumer_strategy = single_active, - waiting_consumers = []}, - _Suspected) -> +update_waiting_consumer_status(_Node, + #state{consumer_strategy = single_active, + waiting_consumers = []}, + _Status) -> []; -maybe_mark_suspect_waiting_consumers(Node, - #state{consumer_strategy = single_active, - waiting_consumers = WaitingConsumers}, - Suspected) -> +update_waiting_consumer_status(Node, + #state{consumer_strategy = single_active, + waiting_consumers = WaitingConsumers}, + Status) -> [begin case node(P) of Node -> - {ConsumerId, Consumer#consumer{suspected_down = Suspected}}; + {ConsumerId, Consumer#consumer{status = Status}}; _ -> {ConsumerId, Consumer} end - end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers]. + end || {{_, P} = ConsumerId, Consumer} <- WaitingConsumers, + Consumer#consumer.status =/= cancelled]. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). state_enter(leader, #state{consumers = Cons, @@ -691,7 +693,7 @@ get_checked_out(Cid, From, To, #state{consumers = Consumers}) -> end. init_aux(Name) when is_atom(Name) -> - %% TODO: catch specific exeption throw if table already exists + %% TODO: catch specific exception throw if table already exists ok = ra_machine_ets:create_table(rabbit_fifo_usage, [named_table, set, public, {write_concurrency, true}]), @@ -739,12 +741,13 @@ query_consumers(#state{consumers = Consumers, consumer_strategy = ConsumerStrategy } = State) -> ActiveActivityStatusFun = case ConsumerStrategy of default -> - fun(_ConsumerId, #consumer{suspected_down = SuspectedDown}) -> - case SuspectedDown of - true -> - {false, suspected_down}; - false -> - {true, up} + fun(_ConsumerId, + #consumer{status = Status}) -> + case Status of + suspected_down -> + {false, Status}; + _ -> + {true, Status} end end; single_active -> @@ -758,18 +761,24 @@ query_consumers(#state{consumers = Consumers, end end end, - FromConsumers = maps:map(fun ({Tag, Pid}, #consumer{meta = Meta} = Consumer) -> - {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), - {Pid, Tag, - maps:get(ack, Meta, undefined), - maps:get(prefetch, Meta, undefined), - Active, - ActivityStatus, - maps:get(args, Meta, []), - maps:get(username, Meta, undefined)} - end, Consumers), + FromConsumers = maps:fold(fun (_, #consumer{status = cancelled}, Acc) -> + Acc; + ({Tag, Pid}, #consumer{meta = Meta} = Consumer, Acc) -> + {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), + maps:put({Tag, Pid}, + {Pid, Tag, + maps:get(ack, Meta, undefined), + maps:get(prefetch, Meta, undefined), + Active, + ActivityStatus, + maps:get(args, Meta, []), + maps:get(username, Meta, undefined)}, + Acc) + end, #{}, Consumers), FromWaitingConsumers = - lists:foldl(fun({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> + lists:foldl(fun ({_, #consumer{status = cancelled}}, Acc) -> + Acc; + ({{Tag, Pid}, #consumer{meta = Meta} = Consumer}, Acc) -> {Active, ActivityStatus} = ActiveActivityStatusFun({Tag, Pid}, Consumer), maps:put({Tag, Pid}, {Pid, Tag, @@ -854,42 +863,42 @@ num_checked_out(#state{consumers = Cons}) -> end, 0, maps:values(Cons)). cancel_consumer(ConsumerId, - #state{consumer_strategy = default} = State, Effects) -> + #state{consumer_strategy = default} = State, Effects, Reason) -> %% general case, single active consumer off - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumer_strategy = single_active, waiting_consumers = []} = State, - Effects) -> + Effects, Reason) -> %% single active consumer on, no consumers are waiting - cancel_consumer0(ConsumerId, State, Effects); + cancel_consumer0(ConsumerId, State, Effects, Reason); cancel_consumer(ConsumerId, #state{consumers = Cons0, consumer_strategy = single_active, waiting_consumers = WaitingConsumers0} = State0, - Effects0) -> + Effects0, Reason) -> %% single active consumer on, consumers are waiting case maps:take(ConsumerId, Cons0) of - {#consumer{checked_out = Checked0}, _} -> + {Consumer, Cons1} -> % The active consumer is to be removed % Cancel it - State1 = return_all(State0, Checked0), - Effects1 = cancel_consumer_effects(ConsumerId, State1, Effects0), + {State1, Effects1} = maybe_return_all(ConsumerId, Consumer, Cons1, State0, Effects0, Reason), + Effects2 = cancel_consumer_effects(ConsumerId, State1, Effects1), % Take another one from the waiting consumers and put it in consumers [{NewActiveConsumerId, NewActiveConsumer} | RemainingWaitingConsumers] = WaitingConsumers0, - #state{service_queue = ServiceQueue} = State0, + #state{service_queue = ServiceQueue} = State1, ServiceQueue1 = maybe_queue_consumer(NewActiveConsumerId, NewActiveConsumer, ServiceQueue), - State = State1#state{consumers = #{NewActiveConsumerId => - NewActiveConsumer}, + State = State1#state{consumers = maps:put(NewActiveConsumerId, + NewActiveConsumer, State1#state.consumers), service_queue = ServiceQueue1, waiting_consumers = RemainingWaitingConsumers}, - Effects2 = consumer_update_active_effects(State, NewActiveConsumerId, + Effects = consumer_update_active_effects(State, NewActiveConsumerId, NewActiveConsumer, true, - single_active, Effects1), - {State, Effects2}; + single_active, Effects2), + {State, Effects}; error -> % The cancelled consumer is not the active one % Just remove it from idle_consumers @@ -914,23 +923,39 @@ consumer_update_active_effects(#state{queue_resource = QName }, [QName, ConsumerId, false, Ack, Prefetch, Active, ActivityStatus, Args]} | Effects]. -cancel_consumer0(ConsumerId, - #state{consumers = C0} = S0, Effects0) -> +cancel_consumer0(ConsumerId, #state{consumers = C0} = S0, Effects0, Reason) -> case maps:take(ConsumerId, C0) of - {#consumer{checked_out = Checked0}, Cons} -> - S = return_all(S0, Checked0), - Effects = cancel_consumer_effects(ConsumerId, S, Effects0), - case maps:size(Cons) of + {Consumer, Cons1} -> + {S, Effects2} = maybe_return_all(ConsumerId, Consumer, Cons1, S0, Effects0, Reason), + Effects = cancel_consumer_effects(ConsumerId, S, Effects2), + case maps:size(S#state.consumers) of 0 -> - {S#state{consumers = Cons}, [{aux, inactive} | Effects]}; + {S, [{aux, inactive} | Effects]}; _ -> - {S#state{consumers = Cons}, Effects} + {S, Effects} end; error -> %% already removed: do nothing {S0, Effects0} end. +maybe_return_all(ConsumerId, #consumer{checked_out = Checked0} = Consumer, Cons1, + #state{consumers = C0, + service_queue = SQ0} = S0, Effects0, Reason) -> + case Reason of + consumer_cancel -> + {Cons, SQ, Effects1} = + update_or_remove_sub(ConsumerId, + Consumer#consumer{lifetime = once, + credit = 0, + status = cancelled}, + C0, SQ0, Effects0), + {S0#state{consumers = Cons, service_queue = SQ}, Effects1}; + down -> + S1 = return_all(S0, Checked0), + {S1#state{consumers = Cons1}, Effects0} + end. + apply_enqueue(#{index := RaftIdx} = Meta, From, Seq, RawMsg, State0) -> Bytes = message_size(RawMsg), case maybe_enqueue(RaftIdx, From, Seq, RawMsg, [], State0) of @@ -1083,7 +1108,7 @@ increase_credit(#consumer{lifetime = once, increase_credit(#consumer{lifetime = auto, credit_mode = credited, credit = Credit}, _) -> - %% credit_mode: credit also doens't automatically increment credit + %% credit_mode: credit also doesn't automatically increment credit Credit; increase_credit(#consumer{credit = Current}, Credit) -> Current + Credit. @@ -1303,7 +1328,9 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); - {ok, #consumer{suspected_down = true}} -> + {ok, #consumer{status = cancelled}} -> + checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{status = suspected_down}} -> checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, @@ -1358,8 +1385,9 @@ update_or_remove_sub(ConsumerId, #consumer{lifetime = once, case maps:size(Checked) of 0 -> % we're done with this consumer - {maps:remove(ConsumerId, Cons), ServiceQueue, - [{demonitor, process, ConsumerId} | Effects]}; + % TODO: demonitor consumer pid but _only_ if there are no other + % monitors for this pid + {maps:remove(ConsumerId, Cons), ServiceQueue, Effects}; _ -> % there are unsettled items so need to keep around {maps:put(ConsumerId, Con, Cons), ServiceQueue, Effects} @@ -1379,7 +1407,6 @@ uniq_queue_in(Key, Queue) -> queue:in(Key, Queue) end. - update_consumer(ConsumerId, Meta, Spec, #state{consumer_strategy = default} = State0) -> %% general case, single active consumer off @@ -1553,18 +1580,18 @@ message_size(Msg) -> suspected_pids_for(Node, #state{consumers = Cons0, enqueuers = Enqs0, waiting_consumers = WaitingConsumers0}) -> - Cons = maps:fold(fun({_, P}, #consumer{suspected_down = true}, Acc) + Cons = maps:fold(fun({_, P}, #consumer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, [], Cons0), - Enqs = maps:fold(fun(P, #enqueuer{suspected_down = true}, Acc) + Enqs = maps:fold(fun(P, #enqueuer{status = suspected_down}, Acc) when node(P) =:= Node -> [P | Acc]; (_, _, Acc) -> Acc end, Cons, Enqs0), lists:foldl(fun({{_, P}, - #consumer{suspected_down = true}}, Acc) + #consumer{status = suspected_down}}, Acc) when node(P) =:= Node -> [P | Acc]; (_, Acc) -> Acc @@ -1814,10 +1841,11 @@ return_checked_out_test() -> {State0, [_, _]} = enq(1, 1, first, test_init(test)), {State1, [_Monitor, {send_msg, _, {delivery, _, [{MsgId, _}]}, ra_event}, - {aux, active} | _ - ]} = check(Cid, 2, State0), - % return - {_State2, _, [_]} = apply(meta(3), make_return(Cid, [MsgId]), State1), + {aux, active} | _ ]} = check_auto(Cid, 2, State0), + % returning immediately checks out the same message again + {_, ok, [{send_msg, _, {delivery, _, [{_, _}]}, ra_event}, + {aux, active}]} = + apply(meta(3), make_return(Cid, [MsgId]), State1), ok. return_auto_checked_out_test() -> @@ -1844,15 +1872,19 @@ cancelled_checkout_out_test() -> {State00, [_, _]} = enq(1, 1, first, test_init(test)), {State0, [_]} = enq(2, 2, second, State00), {State1, _} = check_auto(Cid, 2, State0), - % cancelled checkout should return all pending messages to queue + % cancelled checkout should not return pending messages to queue {State2, _, _} = apply(meta(3), make_checkout(Cid, cancel, #{}), State1), ?assertEqual(1, maps:size(State2#state.messages)), - ?assertEqual(1, lqueue:len(State2#state.returns)), + ?assertEqual(0, lqueue:len(State2#state.returns)), - {State3, {dequeue, {0, {_, first}}, _}, _} = + {State3, {dequeue, empty}} = apply(meta(3), make_checkout(Cid, {dequeue, settled}, #{}), State2), + %% settle + {State4, ok, _} = + apply(meta(4), make_settle(Cid, [0]), State3), + {_State, {dequeue, {_, {_, second}}, _}, _} = - apply(meta(4), make_checkout(Cid, {dequeue, settled}, #{}), State3), + apply(meta(5), make_checkout(Cid, {dequeue, settled}, #{}), State4), ok. down_with_noproc_consumer_returns_unsettled_test() -> @@ -1914,16 +1946,6 @@ down_with_noproc_enqueuer_is_cleaned_up_test() -> ?assert(0 =:= maps:size(State1#state.enqueuers)), ok. -completed_consumer_yields_demonitor_effect_test() -> - Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, - {State0, [_, _]} = enq(1, 1, second, test_init(test)), - {State1, [{monitor, process, _} | _]} = check(Cid, 2, State0), - {_, Effects} = settle(Cid, 3, 0, State1), - ?ASSERT_EFF({demonitor, _, _}, Effects), - % release cursor for empty queue - ?ASSERT_EFF({release_cursor, 3, _}, Effects), - ok. - discarded_message_without_dead_letter_handler_is_removed_test() -> Cid = {<<"completed_consumer_yields_demonitor_effect_test">>, self()}, {State0, [_, _]} = enq(1, 1, first, test_init(test)), @@ -2117,7 +2139,7 @@ enq_check_settle_duplicate_test() -> run_snapshot_test(?FUNCTION_NAME, Commands). run_snapshot_test(Name, Commands) -> - %% create every incremental permuation of the commands lists + %% create every incremental permutation of the commands lists %% and run the snapshot tests against that [begin run_snapshot_test0(Name, C) @@ -2186,7 +2208,7 @@ state_enter_test() -> [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. -state_enter_montors_and_notifications_test() -> +state_enter_monitors_and_notifications_test() -> Oth = spawn(fun () -> ok end), {State0, _} = enq(1, 1, first, test_init(test)), Cid = {<<"adf">>, self()}, @@ -2299,7 +2321,10 @@ single_active_consumer_test() -> ?assertEqual(1, length(Effects1)), % cancelling the active consumer - {State3, _, Effects2} = apply(meta(3), #checkout{spec = cancel, consumer_id = {<<"ctag1">>, self()}}, State2), + {State3, _, Effects2} = apply(meta(3), + #checkout{spec = cancel, + consumer_id = {<<"ctag1">>, self()}}, + State2), % the second registered consumer is now the active one ?assertEqual(1, map_size(State3#state.consumers)), ?assert(maps:is_key({<<"ctag2">>, self()}, State3#state.consumers)), @@ -2402,15 +2427,16 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti State), NewState end, - State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), + State1 = lists:foldl(AddConsumer, State0, + [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), % simulate node goes down {State2, _, _} = apply(#{}, {down, self(), noconnection}, State1), % all the waiting consumers should be suspected down ?assertEqual(3, length(State2#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assert(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status == suspected_down) end, State2#state.waiting_consumers), % simulate node goes back up @@ -2418,8 +2444,8 @@ single_active_consumer_mark_waiting_consumers_as_suspected_when_down_noconnnecti % all the waiting consumers should be un-suspected ?assertEqual(3, length(State3#state.waiting_consumers)), - lists:foreach(fun({_, #consumer{suspected_down = SuspectedDown}}) -> - ?assertNot(SuspectedDown) + lists:foreach(fun({_, #consumer{status = Status}}) -> + ?assert(Status /= suspected_down) end, State3#state.waiting_consumers), ok. @@ -2504,7 +2530,8 @@ query_consumers_test() -> State1 = lists:foldl(AddConsumer, State0, [<<"ctag1">>, <<"ctag2">>, <<"ctag3">>, <<"ctag4">>]), Consumers0 = State1#state.consumers, Consumer = maps:get({<<"ctag2">>, self()}, Consumers0), - Consumers1 = maps:put({<<"ctag2">>, self()}, Consumer#consumer{suspected_down = true}, Consumers0), + Consumers1 = maps:put({<<"ctag2">>, self()}, + Consumer#consumer{status = suspected_down}, Consumers0), State2 = State1#state{consumers = Consumers1}, ?assertEqual(4, query_consumer_count(State2)), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 5d9410918d..04918c3eb9 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -165,7 +165,7 @@ enqueue(Msg, State) -> %% @doc Dequeue a message from the queue. %% -%% This is a syncronous call. I.e. the call will block until the command +%% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. @@ -297,7 +297,7 @@ discard(ConsumerTag, [_|_] = MsgIds, %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. %% -%% This is a syncronous call. I.e. the call will block until the command +%% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. @@ -316,7 +316,7 @@ checkout(ConsumerTag, NumUnsettled, ConsumerInfo, State0) -> %% @doc Register with the rabbit_fifo queue to "checkout" messages as they %% become available. %% -%% This is a syncronous call. I.e. the call will block until the command +%% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. @@ -374,7 +374,7 @@ credit(ConsumerTag, Credit, Drain, %% @doc Cancels a checkout with the rabbit_fifo queue for the consumer tag %% -%% This is a syncronous call. I.e. the call will block until the command +%% This is a synchronous call. I.e. the call will block until the command %% has been accepted by the ra process or it times out. %% %% @param ConsumerTag a unique tag to identify this particular consumer. diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index 82a75b4adc..3bda9bab26 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -87,7 +87,7 @@ find_next(Next, Last, Map) -> _ -> % in degenerate cases the range here could be very large % and hence this could be very slow - % the typical case should idealy be better + % the typical case should ideally be better % assuming fifo-ish deletion of entries find_next(Next+1, Last, Map) end. diff --git a/src/rabbit_guid.erl b/src/rabbit_guid.erl index 6f03a1a04f..5728ac88d7 100644 --- a/src/rabbit_guid.erl +++ b/src/rabbit_guid.erl @@ -92,7 +92,7 @@ advance_blocks({B1, B2, B3, B4}, I) -> %% hashing {B5, I}. The new hash is used as last block, and the %% other three blocks are XORed with it. %% - %% Doing this is convenient because it avoids cascading conflits, + %% Doing this is convenient because it avoids cascading conflicts, %% while being very fast. The conflicts are avoided by propagating %% the changes through all the blocks at each round by XORing, so %% the only occasion in which a collision will take place is when diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl index 1a72d15009..bfdd26fee3 100644 --- a/src/rabbit_lager.erl +++ b/src/rabbit_lager.erl @@ -396,7 +396,7 @@ prepare_rabbit_log_config() -> set_env_default_log_console(); FileName when is_list(FileName) -> case os:getenv("RABBITMQ_LOGS_source") of - %% The user explicitely sets $RABBITMQ_LOGS; + %% The user explicitly sets $RABBITMQ_LOGS; %% we should override a file location even %% if it's set in rabbitmq.config "environment" -> set_env_default_log_file(FileName, override); @@ -406,7 +406,7 @@ prepare_rabbit_log_config() -> %% Upgrade log file never overrides the value set in rabbitmq.config case UpgradeFile of - %% No special env for upgrade logs - rederect to the default sink + %% No special env for upgrade logs - redirect to the default sink undefined -> ok; %% Redirect logs to default output. DefaultFile -> ok; diff --git a/src/rabbit_mirror_queue_coordinator.erl b/src/rabbit_mirror_queue_coordinator.erl index 96474b0d4e..b502da201a 100644 --- a/src/rabbit_mirror_queue_coordinator.erl +++ b/src/rabbit_mirror_queue_coordinator.erl @@ -364,7 +364,7 @@ handle_cast({gm_deaths, DeadGMPids}, State = #state{q = Q}) when ?amqqueue_pid_r %% Different slave is now master, stop current coordinator normally. %% Initiating queue is now slave and the least we could do is report %% deaths which we 'think' we saw. - %% NOTE: Reported deaths here, could be inconsistant. + %% NOTE: Reported deaths here, could be inconsistent. rabbit_mirror_queue_misc:report_deaths(MPid, false, QueueName, DeadPids), {stop, shutdown, State}; diff --git a/src/rabbit_mirror_queue_misc.erl b/src/rabbit_mirror_queue_misc.erl index 04ed4e2887..7383152898 100644 --- a/src/rabbit_mirror_queue_misc.erl +++ b/src/rabbit_mirror_queue_misc.erl @@ -615,7 +615,7 @@ validate_sync_batch_size(none) -> validate_sync_batch_size(N) when is_integer(N) andalso N > 0 -> ok; validate_sync_batch_size(N) -> - {error, "ha-sync-batch-size takes an integer greather than 0, " + {error, "ha-sync-batch-size takes an integer greater than 0, " "~p given", [N]}. validate_pos(PromoteOnShutdown) -> diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index 337064ad39..a4dfba47e7 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -236,7 +236,7 @@ %% updated. %% %% On non-clean startup, we scan the files we discover, dealing with -%% the possibilites of a crash having occurred during a compaction +%% the possibilities of a crash having occurred during a compaction %% (this consists of tidyup - the compaction is deliberately designed %% such that data is duplicated on disk rather than risking it being %% lost), and rebuild the file summary and index ETS table. @@ -596,7 +596,7 @@ client_read2(false, undefined, _MsgLocation, Defer, _CState) -> Defer(); client_read2(true, _Right, _MsgLocation, Defer, _CState) -> %% Of course, in the mean time, the GC could have run and our msg - %% is actually in a different file, unlocked. However, defering is + %% is actually in a different file, unlocked. However, deferring is %% the safest and simplest thing to do. Defer(); client_read2(false, _Right, diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index cd46ade0e2..9fa1dfa462 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -219,7 +219,7 @@ subscribe(Pid) -> %% We could confirm something by having an HA queue see the pausing %% state (and fail over into it) before the node monitor stops us, or %% by using unmirrored queues and just having them vanish (and -%% confiming messages as thrown away). +%% confirming messages as thrown away). %% %% So we have channels call in here before issuing confirms, to do a %% lightweight check that we have not entered a pausing state. diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 621f42dafb..12d3291b54 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -22,7 +22,7 @@ -behaviour(rabbit_backing_queue). -%% enabled unconditionally. Disabling priority queueing after +%% enabled unconditionally. Disabling priority queuing after %% it has been enabled is dangerous. -rabbit_boot_step({?MODULE, [{description, "enable priority queue"}, @@ -55,7 +55,7 @@ -define(passthrough3(F), {Res1, Res2, BQS1} = BQ:F, {Res1, Res2, State#passthrough{bqs = BQS1}}). -%% This module adds suport for priority queues. +%% This module adds support for priority queues. %% %% Priority queues have one backing queue per priority. Backing queue functions %% then produce a list of results for each BQ and fold over them, sorting diff --git a/src/rabbit_queue_index.erl b/src/rabbit_queue_index.erl index e4047a9902..61373e49c1 100644 --- a/src/rabbit_queue_index.erl +++ b/src/rabbit_queue_index.erl @@ -355,7 +355,7 @@ pre_publish(MsgOrId, SeqId, MsgProps, IsPersistent, IsDelivered, JournalSizeHint State1#qistate{pre_publish_cache = PPC1, delivered_cache = DC1})). -%% pre_publish_cache is the entry with most elements when comapred to +%% pre_publish_cache is the entry with most elements when compared to %% delivered_cache so we only check the former in the guard. maybe_flush_pre_publish_cache(JournalSizeHint, #qistate{pre_publish_cache = PPC} = State) diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 3a2d4650ca..c0cb9c57d5 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -240,7 +240,7 @@ server_properties(Protocol) -> {ok, RawConfigServerProps} = application:get_env(rabbit, server_properties), - %% Normalize the simplifed (2-tuple) and unsimplified (3-tuple) forms + %% Normalize the simplified (2-tuple) and unsimplified (3-tuple) forms %% from the config and merge them with the generated built-in properties NormalizedConfigServerProps = [{<<"capabilities">>, table, server_capabilities(Protocol)} | @@ -852,7 +852,7 @@ handle_exception(State = #v1{connection = #connection{protocol = Protocol}, respond_and_close(State, Channel, Protocol, Reason, {handshake_error, CS, Reason}); %% when negotiation fails, e.g. due to channel_max being higher than the -%% maxiumum allowed limit +%% maximum allowed limit handle_exception(State = #v1{connection = #connection{protocol = Protocol, log_name = ConnName, user = User}, diff --git a/src/rabbit_upgrade.erl b/src/rabbit_upgrade.erl index f452d5c92f..83ec692ff3 100644 --- a/src/rabbit_upgrade.erl +++ b/src/rabbit_upgrade.erl @@ -31,7 +31,7 @@ %% clusters. %% %% Firstly, we have two different types of upgrades to do: Mnesia and -%% everythinq else. Mnesia upgrades must only be done by one node in +%% everything else. Mnesia upgrades must only be done by one node in %% the cluster (we treat a non-clustered node as a single-node %% cluster). This is the primary upgrader. The other upgrades need to %% be done by all nodes. @@ -314,7 +314,7 @@ node_type_legacy() -> %% hang), we can't look at the config file (may not include us %% even if we're a disc node). We also can't use %% rabbit_mnesia:node_type/0 because that will give false - %% postivies on Rabbit up to 2.5.1. + %% positives on Rabbit up to 2.5.1. case filelib:is_regular(filename:join(dir(), "rabbit_durable_exchange.DCD")) of true -> disc; false -> ram diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 8b773f2cc2..e2b4f64a8f 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1735,7 +1735,7 @@ purge_and_index_reset(State) -> %% %% purge_betas_and_deltas/2 loads messages from the queue index, %% filling up q3 and in some cases moving messages form q2 to q3 while -%% reseting q2 to an empty queue (see maybe_deltas_to_betas/2). The +%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The %% messages loaded into q3 are removed by calling %% remove_queue_entries/3 until there are no more messages to be read %% from the queue index. Messages are read in batches from the queue @@ -1939,7 +1939,7 @@ maybe_write_msg_to_disk(Force, MsgStatus = #msg_status { maybe_write_msg_to_disk(_Force, MsgStatus, State) -> {MsgStatus, State}. -%% Due to certain optimizations made inside +%% Due to certain optimisations made inside %% rabbit_queue_index:pre_publish/7 we need to have two separate %% functions for index persistence. This one is only used when paging %% during memory pressure. We didn't want to modify diff --git a/test/channel_operation_timeout_SUITE.erl b/test/channel_operation_timeout_SUITE.erl index 77da6133d8..47907d09f4 100644 --- a/test/channel_operation_timeout_SUITE.erl +++ b/test/channel_operation_timeout_SUITE.erl @@ -87,7 +87,7 @@ notify_down_all(Config) -> declare(QCfg0), %% Testing rabbit_amqqueue:notify_down_all via rabbit_channel. %% Consumer count = 0 after correct channel termination and - %% notification of queues via delagate:call/3 + %% notification of queues via delegate:call/3 true = (0 =/= length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST))), rabbit_ct_client_helpers:close_channel(RabbitCh), 0 = length(get_consumers(Config, Rabbit, ?DEFAULT_VHOST)), diff --git a/test/clustering_management_SUITE.erl b/test/clustering_management_SUITE.erl index 5ae2fb687c..a4fd63ba04 100644 --- a/test/clustering_management_SUITE.erl +++ b/test/clustering_management_SUITE.erl @@ -153,9 +153,9 @@ join_and_part_cluster(Config) -> join_cluster_bad_operations(Config) -> [Rabbit, Hare, Bunny] = cluster_members(Config), - %% Non-existant node + %% Nonexistent node ok = stop_app(Rabbit), - assert_failure(fun () -> join_cluster(Rabbit, non@existant) end), + assert_failure(fun () -> join_cluster(Rabbit, non@existent) end), ok = start_app(Rabbit), assert_not_clustered(Rabbit), @@ -217,8 +217,8 @@ forget_cluster_node(Config) -> ok = stop_app(Rabbit), %% We're passing the --offline flag, but Hare is online assert_failure(fun () -> forget_cluster_node(Hare, Rabbit, true) end), - %% Removing some non-existant node will fail - assert_failure(fun () -> forget_cluster_node(Hare, non@existant) end), + %% Removing some nonexistent node will fail + assert_failure(fun () -> forget_cluster_node(Hare, non@existent) end), ok = forget_cluster_node(Hare, Rabbit), assert_not_clustered(Hare), assert_cluster_status({[Rabbit, Hare], [Rabbit, Hare], [Hare]}, @@ -504,8 +504,8 @@ update_cluster_nodes(Config) -> stop_reset_start(Hare), assert_failure(fun () -> start_app(Rabbit) end), %% Bogus node - assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existant) end), - %% Inconsisent node + assert_failure(fun () -> update_cluster_nodes(Rabbit, non@existent) end), + %% Inconsistent node assert_failure(fun () -> update_cluster_nodes(Rabbit, Hare) end), ok = update_cluster_nodes(Rabbit, Bunny), ok = start_app(Rabbit), diff --git a/test/dead_lettering_SUITE.erl b/test/dead_lettering_SUITE.erl index 72c72b096c..6722958973 100644 --- a/test/dead_lettering_SUITE.erl +++ b/test/dead_lettering_SUITE.erl @@ -577,7 +577,7 @@ dead_letter_routing_key_header_BCC(Config) -> %% It is possible to form a cycle of message dead-lettering. For instance, %% this can happen when a queue dead-letters messages to the default exchange without -%% specifiying a dead-letter routing key (5). Messages in such cycles (i.e. messages that +%% specifying a dead-letter routing key (5). Messages in such cycles (i.e. messages that %% reach the same queue twice) will be dropped if there was no rejections in the entire cycle. %% i.e. x-message-ttl (7), x-max-length (6) %% @@ -741,7 +741,7 @@ dead_letter_override_policy(Config) -> [_] = consume(Ch, DLXQName, [P1]). %% 9) Policy is set after have declared a queue with dead letter arguments. Policy will be -%% overriden/ignored. +%% overridden/ignored. dead_letter_ignore_policy(Config) -> {_Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), QName = ?config(queue_name, Config), diff --git a/test/dynamic_ha_SUITE.erl b/test/dynamic_ha_SUITE.erl index 6ccf3a75c3..3bdf7bb009 100644 --- a/test/dynamic_ha_SUITE.erl +++ b/test/dynamic_ha_SUITE.erl @@ -484,7 +484,7 @@ failing_random_policies(Config) -> [A, B | _] = Nodes = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), %% Those set of policies were found as failing by PropEr in the - %% `random_policy` test above. We add them explicitely here to make + %% `random_policy` test above. We add them explicitly here to make %% sure they get tested. ?assertEqual(true, test_random_policy(Config, Nodes, [{nodes, [A, B]}, {nodes, [A]}])), diff --git a/test/metrics_SUITE.erl b/test/metrics_SUITE.erl index 44368b643d..d00c01408e 100644 --- a/test/metrics_SUITE.erl +++ b/test/metrics_SUITE.erl @@ -144,7 +144,7 @@ connection_metric_idemp(Config, {N, R}) -> || _ <- lists:seq(1, N)], Table = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_coarse_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], force_metric_gc(Config), TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, connection_metrics)], @@ -158,7 +158,7 @@ channel_metric_idemp(Config, {N, R}) -> [amqp_connection:open_channel(Conn) || _ <- lists:seq(1, N)], Table = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], Table2 = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_process_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times [[Pid ! emit_stats || Pid <- Table] || _ <- lists:seq(1, R)], force_metric_gc(Config), TableAfter = [ Pid || {Pid, _} <- read_table_rpc(Config, channel_metrics)], @@ -181,7 +181,7 @@ queue_metric_idemp(Config, {N, R}) -> Table = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_metrics)], Table2 = [ Pid || {Pid, _, _} <- read_table_rpc(Config, queue_coarse_metrics)], - % referesh stats 'R' times + % refresh stats 'R' times ChanTable = read_table_rpc(Config, channel_created), [[Pid ! emit_stats || {Pid, _, _} <- ChanTable ] || _ <- lists:seq(1, R)], force_metric_gc(Config), diff --git a/test/queue_master_location_SUITE.erl b/test/queue_master_location_SUITE.erl index d4c8da2bcb..677355f5e8 100644 --- a/test/queue_master_location_SUITE.erl +++ b/test/queue_master_location_SUITE.erl @@ -22,7 +22,7 @@ %% location strategies can be applied in the following ways; %% 1. As policy, %% 2. As config (in rabbitmq.config), -%% 3. or as part of the queue's declare arguements. +%% 3. or as part of the queue's declare arguments. %% %% Currently supported strategies are; %% min-masters : The queue master node is calculated as the one with the diff --git a/test/queue_parallel_SUITE.erl b/test/queue_parallel_SUITE.erl new file mode 100644 index 0000000000..1290c97b23 --- /dev/null +++ b/test/queue_parallel_SUITE.erl @@ -0,0 +1,587 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License at +%% http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the +%% License for the specific language governing rights and limitations +%% under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2011-2019 Pivotal Software, Inc. All rights reserved. +%% +%% +-module(queue_parallel_SUITE). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("kernel/include/file.hrl"). +-include_lib("amqp_client/include/amqp_client.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +-compile(export_all). + +-define(TIMEOUT, 30000). + +-import(quorum_queue_utils, [wait_for_messages/2]). + +all() -> + [ + {group, parallel_tests} + ]. + +groups() -> + AllTests = [publish, + consume, + consume_first_empty, + consume_from_empty_queue, + consume_and_autoack, + subscribe, + subscribe_with_autoack, + consume_and_ack, + consume_and_multiple_ack, + subscribe_and_ack, + subscribe_and_multiple_ack, + subscribe_and_requeue_multiple_nack, + subscribe_and_nack, + subscribe_and_requeue_nack, + subscribe_and_multiple_nack, + consume_and_requeue_nack, + consume_and_nack, + consume_and_requeue_multiple_nack, + consume_and_multiple_nack, + basic_cancel, + purge, + basic_recover, + delete_immediately_by_resource + ], + [ + {parallel_tests, [], + [ + {classic_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {mirrored_queue, [parallel], AllTests ++ [delete_immediately_by_pid_succeeds]}, + {quorum_queue, [parallel], AllTests ++ [delete_immediately_by_pid_fails]} + ]} + ]. + +suite() -> + [ + {timetrap, {minutes, 3}} + ]. + +%% ------------------------------------------------------------------- +%% Testsuite setup/teardown. +%% ------------------------------------------------------------------- + +init_per_suite(Config) -> + rabbit_ct_helpers:log_environment(), + rabbit_ct_helpers:run_setup_steps(Config). + +end_per_suite(Config) -> + rabbit_ct_helpers:run_teardown_steps(Config). + +init_per_group(classic_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]); +init_per_group(quorum_queue, Config) -> + rabbit_ct_helpers:set_config( + Config, + [{queue_args, [{<<"x-queue-type">>, longstr, <<"quorum">>}]}, + {queue_durable, true}]); +init_per_group(mirrored_queue, Config) -> + rabbit_ct_broker_helpers:set_ha_policy(Config, 0, <<"^max_length.*queue">>, + <<"all">>, [{<<"ha-sync-mode">>, <<"automatic">>}]), + Config1 = rabbit_ct_helpers:set_config( + Config, [{is_mirrored, true}, + {queue_args, [{<<"x-queue-type">>, longstr, <<"classic">>}]}, + {queue_durable, true}]), + rabbit_ct_helpers:run_steps(Config1, []); +init_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + ClusterSize = 2, + Config1 = rabbit_ct_helpers:set_config(Config, [ + {rmq_nodename_suffix, Group}, + {rmq_nodes_count, ClusterSize} + ]), + rabbit_ct_helpers:run_steps(Config1, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps()); + false -> + rabbit_ct_helpers:run_steps(Config, []) + end. + +end_per_group(Group, Config) -> + case lists:member({group, Group}, all()) of + true -> + rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()); + false -> + Config + end. + +init_per_testcase(Testcase, Config) -> + Group = proplists:get_value(name, ?config(tc_group_properties, Config)), + Q = rabbit_data_coercion:to_binary(io_lib:format("~p_~p", [Group, Testcase])), + Q2 = rabbit_data_coercion:to_binary(io_lib:format("~p_~p_2", [Group, Testcase])), + Config1 = rabbit_ct_helpers:set_config(Config, [{queue_name, Q}, + {queue_name_2, Q2}]), + rabbit_ct_helpers:testcase_started(Config1, Testcase). + +end_per_testcase(Testcase, Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name, Config)}), + amqp_channel:call(Ch, #'queue.delete'{queue = ?config(queue_name_2, Config)}), + rabbit_ct_helpers:testcase_finished(Config, Testcase). + +publish(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +consume_first_empty(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, [<<"msg1">>]), + rabbit_ct_client_helpers:close_channel(Ch). + +consume_from_empty_queue(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + consume_empty(Ch, QName). + +consume_and_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + consume(Ch, QName, true, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + ?assertMatch(#'basic.qos_ok'{}, + amqp_channel:call(Ch, #'basic.qos'{global = false, + prefetch_count = 10})), + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + + subscribe(Ch, QName, false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + + %% validate we can retrieve the consumers + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + [Consumer] = lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers), + ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), + ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), + ?assertEqual(true, proplists:get_value(ack_required, Consumer)), + ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), + ?assertEqual([], proplists:get_value(arguments, Consumer)), + + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +subscribe_with_autoack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + subscribe(Ch, QName, true), + receive_basic_deliver(false), + receive_basic_deliver(false), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]), + rabbit_ct_client_helpers:close_channel(Ch), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_ack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + receive_basic_deliver(true), + receive_basic_deliver(true), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, + multiple = true}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +consume_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]). + +consume_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [DeliveryTag] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +consume_and_requeue_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = true}), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]). + +consume_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + [_, _, DeliveryTag] = consume(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]). + +subscribe_and_requeue_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end + end. + +subscribe_and_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +subscribe_and_multiple_nack(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"3">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive_basic_deliver(false), + receive_basic_deliver(false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + wait_for_messages(Config, [[QName, <<"3">>, <<"0">>, <<"3">>]]), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = true, + requeue = false}), + wait_for_messages(Config, [[QName, <<"0">>, <<"0">>, <<"0">>]]) + end. + +%% TODO test with single active +basic_cancel(Config) -> + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + subscribe(Ch, QName, false), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), + Consumers = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + ?assertEqual([], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + QName == Resource#resource.name + end, Consumers)), + publish(Ch, QName, [<<"msg2">>, <<"msg3">>]), + wait_for_messages(Config, [[QName, <<"3">>, <<"2">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]) + after 5000 -> + exit(basic_deliver_timeout) + end. + +purge(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>, <<"msg2">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"2">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"2">>, <<"1">>, <<"1">>]]), + {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QName}), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]). + +basic_recover(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + publish(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]), + [_] = consume(Ch, QName, [<<"msg1">>]), + wait_for_messages(Config, [[QName, <<"1">>, <<"0">>, <<"1">>]]), + amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), + wait_for_messages(Config, [[QName, <<"1">>, <<"1">>, <<"0">>]]). + +delete_immediately_by_pid_fails(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QName, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_pid_succeeds(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*ok.*", [{capture, none}])), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + QName = ?config(queue_name, Config), + declare_queue(Ch, Config, QName), + + Cmd = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QName) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd)), + + ?assertExit( + {{shutdown, {server_initiated_close, 404, _}}, _}, + amqp_channel:call(Ch, #'queue.declare'{queue = QName, + durable = Durable, + passive = true, + auto_delete = false, + arguments = Args})). + +%%%%%%%%%%%%%%%%%%%%%%%% +%% Test helpers +%%%%%%%%%%%%%%%%%%%%%%%% +declare_queue(Ch, Config, QName) -> + Args = ?config(queue_args, Config), + Durable = ?config(queue_durable, Config), + #'queue.declare_ok'{} = amqp_channel:call(Ch, #'queue.declare'{queue = QName, + arguments = Args, + durable = Durable}). + +publish(Ch, QName, Payloads) -> + [amqp_channel:call(Ch, #'basic.publish'{routing_key = QName}, #amqp_msg{payload = Payload}) + || Payload <- Payloads]. + +consume(Ch, QName, Payloads) -> + consume(Ch, QName, false, Payloads). + +consume(Ch, QName, NoAck, Payloads) -> + [begin + {#'basic.get_ok'{delivery_tag = DTag}, #amqp_msg{payload = Payload}} = + amqp_channel:call(Ch, #'basic.get'{queue = QName, + no_ack = NoAck}), + DTag + end || Payload <- Payloads]. + +consume_empty(Ch, QName) -> + ?assertMatch(#'basic.get_empty'{}, + amqp_channel:call(Ch, #'basic.get'{queue = QName})). + +subscribe(Ch, Queue, NoAck) -> + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Queue, + no_ack = NoAck, + consumer_tag = <<"ctag">>}, + self()), + receive + #'basic.consume_ok'{consumer_tag = <<"ctag">>} -> + ok + end. + +receive_basic_deliver(Redelivered) -> + receive + {#'basic.deliver'{redelivered = R}, _} when R == Redelivered -> + ok + end. diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 02afba97c5..e9f49039ed 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -93,26 +93,7 @@ all_tests() -> restart_queue, restart_all_types, stop_start_rabbit_app, - publish, publish_and_restart, - consume, - consume_first_empty, - consume_from_empty_queue, - consume_and_autoack, - subscribe, - subscribe_with_autoack, - consume_and_ack, - consume_and_multiple_ack, - subscribe_and_ack, - subscribe_and_multiple_ack, - consume_and_requeue_nack, - consume_and_requeue_multiple_nack, - subscribe_and_requeue_nack, - subscribe_and_requeue_multiple_nack, - consume_and_nack, - consume_and_multiple_nack, - subscribe_and_nack, - subscribe_and_multiple_nack, subscribe_should_fail_when_global_qos_true, dead_letter_to_classic_queue, dead_letter_to_quorum_queue, @@ -120,14 +101,10 @@ all_tests() -> dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, - basic_cancel, - purge, sync_queue, cancel_sync_queue, - basic_recover, idempotent_recover, vhost_with_quorum_queue_is_deleted, - delete_immediately, delete_immediately_by_resource, consume_redelivery_count, subscribe_redelivery_count, @@ -612,18 +589,6 @@ stop_start_rabbit_app(Config) -> amqp_channel:call(Ch2, #'basic.get'{queue = CQ2, no_ack = false}), delete_queues(Ch2, [QQ1, QQ2, CQ1, CQ2]). -publish(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - publish(Ch, QQ), - Name = ra_name(QQ), - wait_for_messages_ready(Servers, Name, 1), - wait_for_messages_pending_ack(Servers, Name, 0). - publish_confirm(Ch, QName) -> publish(Ch, QName), amqp_channel:register_confirm_handler(Ch, self()), @@ -660,41 +625,6 @@ publish_and_restart(Config) -> wait_for_messages_ready(Servers, RaName, 2), wait_for_messages_pending_ack(Servers, RaName, 0). -consume(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_first_empty(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - consume_empty(Ch, QQ, false), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, false), - rabbit_ct_client_helpers:close_channel(Ch). - consume_in_minority(Config) -> [Server0, Server1, Server2] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -711,63 +641,6 @@ consume_in_minority(Config) -> amqp_channel:call(Ch, #'basic.get'{queue = QQ, no_ack = false})). -consume_and_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - consume(Ch, QQ, true), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_from_empty_queue(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - consume_empty(Ch, QQ, false). - -subscribe(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - qos(Ch, 10, false), - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - %% validate we can retrieve the consumers - [Consumer] = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), - ct:pal("Consumer ~p", [Consumer]), - ?assert(is_pid(proplists:get_value(channel_pid, Consumer))), - ?assert(is_binary(proplists:get_value(consumer_tag, Consumer))), - ?assertEqual(true, proplists:get_value(ack_required, Consumer)), - ?assertEqual(10, proplists:get_value(prefetch_count, Consumer)), - ?assertEqual([], proplists:get_value(arguments, Consumer)), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - subscribe_should_fail_when_global_qos_true(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -789,338 +662,6 @@ subscribe_should_fail_when_global_qos_true(Config) -> end, ok. -subscribe_with_autoack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, true), - receive_basic_deliver(false), - receive_basic_deliver(false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0), - rabbit_ct_client_helpers:close_channel(Ch), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_ack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -consume_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -consume_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - _ = consume(Ch, QQ, false), - DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0). - -subscribe_and_requeue_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = true}), - receive_basic_deliver(true), - receive_basic_deliver(true), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1, - multiple = true}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_requeue_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = true}), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag1, - redelivered = true}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag1}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end - end. - -subscribe_and_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = false, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - -subscribe_and_multiple_nack(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 3), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive_basic_deliver(false), - receive_basic_deliver(false), - receive - {#'basic.deliver'{delivery_tag = DeliveryTag, - redelivered = false}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 3), - amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, - multiple = true, - requeue = false}), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 0) - end. - dead_letter_to_classic_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1500,51 +1041,6 @@ delete_declare(Config) -> wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_cancel(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - subscribe(Ch, QQ, false), - receive - {#'basic.deliver'{}, _} -> - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = <<"ctag">>}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - [] = rpc:call(Server, ets, tab2list, [consumer_created]) - after 5000 -> - exit(basic_deliver_timeout) - end. - -purge(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 2), - wait_for_messages_pending_ack(Servers, RaName, 0), - _DeliveryTag = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 1), - {'queue.purge_ok', 1} = amqp_channel:call(Ch, #'queue.purge'{queue = QQ}), - wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages_ready(Servers, RaName, 0). - sync_queue(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1922,45 +1418,6 @@ reconnect_consumer_and_wait_channel_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). -basic_recover(Config) -> - [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), - - RaName = ra_name(QQ), - publish(Ch, QQ), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0), - _ = consume(Ch, QQ, false), - wait_for_messages_ready(Servers, RaName, 0), - wait_for_messages_pending_ack(Servers, RaName, 1), - amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), - wait_for_messages_ready(Servers, RaName, 1), - wait_for_messages_pending_ack(Servers, RaName, 0). - -delete_immediately(Config) -> - Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), - - Ch = rabbit_ct_client_helpers:open_channel(Config, Server), - QQ = ?config(queue_name, Config), - Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - declare(Ch, QQ, Args)), - - Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], - {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), - ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), - - ?assertEqual({'queue.declare_ok', QQ, 0, 0}, - amqp_channel:call(Ch, #'queue.declare'{queue = QQ, - durable = true, - passive = true, - auto_delete = false, - arguments = Args})). - delete_immediately_by_resource(Config) -> Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), Ch = rabbit_ct_client_helpers:open_channel(Config, Server), @@ -2046,7 +1503,7 @@ consume_redelivery_count(Config) -> amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = true}), - %% wait for requeueing + %% wait for requeuing timer:sleep(500), {#'basic.get_ok'{delivery_tag = DeliveryTag1, diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index 293a597d14..60402b3a7b 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -104,7 +104,7 @@ basics(Config) -> exit(await_msg_timeout) end, - % process settle applied notificaiton + % process settle applied notification FState5b = process_ra_event(FState5, 250), _ = ra:stop_server(ServerId), _ = ra:restart_server(ServerId), @@ -395,7 +395,8 @@ cancel_checkout(Config) -> {ok, F2} = rabbit_fifo_client:checkout(<<"tag">>, 10, undefined, F1), {_, _, F3} = process_ra_events0(F2, [], [], 250, fun (_, S) -> S end), {ok, F4} = rabbit_fifo_client:cancel_checkout(<<"tag">>, F3), - {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F4), + {ok, F5} = rabbit_fifo_client:return(<<"tag">>, [0], F4), + {ok, {{_, {_, m1}}, _}, _} = rabbit_fifo_client:dequeue(<<"d1">>, settled, F5), ok. credit(Config) -> diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl index c4f5690b72..437cd02e25 100644 --- a/test/rabbit_fifo_prop_SUITE.erl +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -321,7 +321,7 @@ checkout_gen(Pid) -> }). expand(Ops) -> - %% execute each command against a rabbit_fifo state and capture all releavant + %% execute each command against a rabbit_fifo state and capture all relevant %% effects T = #t{}, #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), @@ -463,7 +463,7 @@ run_proper(Fun, Args, NumTests) -> end}])). run_snapshot_test(Conf, Commands) -> - %% create every incremental permuation of the commands lists + %% create every incremental permutation of the commands lists %% and run the snapshot tests against that [begin % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), diff --git a/test/rabbit_ha_test_consumer.erl b/test/rabbit_ha_test_consumer.erl index 8d61903308..85f0a17b2a 100644 --- a/test/rabbit_ha_test_consumer.erl +++ b/test/rabbit_ha_test_consumer.erl @@ -64,7 +64,7 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) -> CancelOnFailover, MsgNum, MsgsToConsume - 1); MsgNum >= LowestSeen -> error_logger:info_msg( - "consumer ~p on ~p ignoring redeliverd msg ~p~n", + "consumer ~p on ~p ignoring redelivered msg ~p~n", [self(), Channel, MsgNum]), true = Redelivered, %% ASSERTION run(TestPid, Channel, Queue, diff --git a/test/rabbitmqctl_shutdown_SUITE.erl b/test/rabbitmqctl_shutdown_SUITE.erl index 0debfde2b6..b4279c6032 100644 --- a/test/rabbitmqctl_shutdown_SUITE.erl +++ b/test/rabbitmqctl_shutdown_SUITE.erl @@ -111,7 +111,7 @@ node_is_running(Node) -> shutdown_ok(Node) -> %% Start a command {stream, Stream} = rabbit_ct_broker_helpers:control_action(shutdown, Node, []), - %% Execute command steps. Each step will ouput a binary string + %% Execute command steps. Each step will output a binary string Lines = 'Elixir.Enum':to_list(Stream), ct:pal("Command output ~p ~n", [Lines]), [true = is_binary(Line) || Line <- Lines], diff --git a/test/single_active_consumer_SUITE.erl b/test/single_active_consumer_SUITE.erl index 6071aeb5a5..0b12f54c0b 100644 --- a/test/single_active_consumer_SUITE.erl +++ b/test/single_active_consumer_SUITE.erl @@ -33,12 +33,14 @@ groups() -> all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks, amqp_exclusive_consume_fails_on_exclusive_consumer_queue ]}, {quorum_queue, [], [ all_messages_go_to_one_consumer, fallback_to_another_consumer_when_first_one_is_cancelled, - fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled, + fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks %% amqp_exclusive_consume_fails_on_exclusive_consumer_queue % Exclusive consume not implemented in QQ ]} ]. @@ -131,7 +133,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(1, MessageCount div 2)], - {MessagesPerConsumer1, _} = wait_for_messages(MessageCount div 2), + {ok, {MessagesPerConsumer1, _}} = wait_for_messages(MessageCount div 2), FirstActiveConsumerInList = maps:keys(maps:filter(fun(_CTag, Count) -> Count > 0 end, MessagesPerConsumer1)), ?assertEqual(1, length(FirstActiveConsumerInList)), @@ -141,8 +143,8 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> {cancel_ok, FirstActiveConsumer} = wait_for_cancel_ok(), [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}) || _X <- lists:seq(MessageCount div 2 + 1, MessageCount - 1)], - - {MessagesPerConsumer2, _} = wait_for_messages(MessageCount div 2 - 1), + + {ok, {MessagesPerConsumer2, _}} = wait_for_messages(MessageCount div 2 - 1), SecondActiveConsumerInList = maps:keys(maps:filter( fun(CTag, Count) -> Count > 0 andalso CTag /= FirstActiveConsumer end, MessagesPerConsumer2) @@ -153,7 +155,7 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = SecondActiveConsumer}), amqp_channel:cast(Ch, Publish, #amqp_msg{payload = <<"foobar">>}), - wait_for_messages(1), + ?assertMatch({ok, _}, wait_for_messages(1)), LastActiveConsumer = lists:nth(1, lists:delete(FirstActiveConsumer, lists:delete(SecondActiveConsumer, [CTag1, CTag2, CTag3]))), @@ -171,6 +173,54 @@ fallback_to_another_consumer_when_first_one_is_cancelled(Config) -> amqp_connection:close(C), ok. +fallback_to_another_consumer_when_first_one_is_cancelled_manual_acks(Config) -> + %% Let's ensure that although the consumer is cancelled we still keep the unacked + %% messages and accept acknowledgments on them. + [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + {C, Ch} = connection_and_channel(Config), + Q = queue_declare(Ch, Config), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + #'basic.consume_ok'{} = + amqp_channel:subscribe(Ch, #'basic.consume'{queue = Q, no_ack = false}, self()), + Consumers0 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers0)), + + Publish = #'basic.publish'{exchange = <<>>, routing_key = Q}, + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg1">>, <<"msg2">>]], + + {CTag, DTag1} = receive_deliver(), + {_CTag, DTag2} = receive_deliver(), + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + #'basic.cancel_ok'{} = amqp_channel:call(Ch, #'basic.cancel'{consumer_tag = CTag}), + + receive + #'basic.cancel_ok'{consumer_tag = CTag} -> + ok + end, + Consumers1 = rpc:call(Server, rabbit_amqqueue, consumers_all, [<<"/">>]), + ?assertMatch([_, _], lists:filter(fun(Props) -> + Resource = proplists:get_value(queue_name, Props), + Q == Resource#resource.name + end, Consumers1)), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + [amqp_channel:cast(Ch, Publish, #amqp_msg{payload = P}) || P <- [<<"msg3">>, <<"msg4">>]], + + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"4">>, <<"0">>, <<"4">>]]), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag1}), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DTag2}), + quorum_queue_utils:wait_for_messages(Config, [[Q, <<"2">>, <<"0">>, <<"2">>]]), + + amqp_connection:close(C), + ok. + fallback_to_another_consumer_when_exclusive_consumer_channel_is_cancelled(Config) -> {C, Ch} = connection_and_channel(Config), {C1, Ch1} = connection_and_channel(Config), @@ -276,13 +326,13 @@ wait_for_messages(ExpectedCount) -> wait_for_messages(ExpectedCount, {}). wait_for_messages(0, State) -> - State; -wait_for_messages(ExpectedCount, _) -> + {ok, State}; +wait_for_messages(ExpectedCount, State) -> receive {message, {MessagesPerConsumer, MessageCount}} -> wait_for_messages(ExpectedCount - 1, {MessagesPerConsumer, MessageCount}) after 5000 -> - throw(message_waiting_timeout) + {missing, ExpectedCount, State} end. wait_for_cancel_ok() -> @@ -292,3 +342,12 @@ wait_for_cancel_ok() -> after 5000 -> throw(consumer_cancel_ok_timeout) end. + +receive_deliver() -> + receive + {#'basic.deliver'{consumer_tag = CTag, + delivery_tag = DTag}, _} -> + {CTag, DTag} + after 5000 -> + exit(deliver_timeout) + end. diff --git a/test/unit_inbroker_non_parallel_SUITE.erl b/test/unit_inbroker_non_parallel_SUITE.erl index d2db382e30..aaded5fa99 100644 --- a/test/unit_inbroker_non_parallel_SUITE.erl +++ b/test/unit_inbroker_non_parallel_SUITE.erl @@ -338,7 +338,7 @@ log_management_during_startup1(_Config) -> application:unset_env(lager, extra_sinks), ok = try rabbit:start() of ok -> exit({got_success_but_expected_failure, - log_rotatation_parent_dirs_test}) + log_rotation_parent_dirs_test}) catch _:{error, {cannot_log_to_file, _, Reason2}} when Reason2 =:= enoent orelse Reason2 =:= eacces -> ok; diff --git a/test/unit_inbroker_parallel_SUITE.erl b/test/unit_inbroker_parallel_SUITE.erl index 364bd5aabb..9518e06196 100644 --- a/test/unit_inbroker_parallel_SUITE.erl +++ b/test/unit_inbroker_parallel_SUITE.erl @@ -1384,7 +1384,7 @@ max_message_size(Config) -> {_, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), - %% Binary is whithin the max size limit + %% Binary is within the max size limit amqp_channel:call(Ch, #'basic.publish'{routing_key = <<"none">>}, #amqp_msg{payload = Binary2M}), %% The channel process is alive assert_channel_alive(Ch), diff --git a/test/unit_log_config_SUITE.erl b/test/unit_log_config_SUITE.erl index 584aa76760..07134f309a 100644 --- a/test/unit_log_config_SUITE.erl +++ b/test/unit_log_config_SUITE.erl @@ -660,7 +660,7 @@ env_var_tty(_) -> application:set_env(rabbit, lager_log_root, "/tmp/log_base"), application:set_env(rabbit, lager_default_file, tty), application:set_env(rabbit, lager_upgrade_file, tty), - %% tty can only be set explicitely + %% tty can only be set explicitly os:putenv("RABBITMQ_LOGS_source", "environment"), rabbit_lager:configure_lager(), diff --git a/test/worker_pool_SUITE.erl b/test/worker_pool_SUITE.erl index 8c9c5fa366..e53abdfa05 100644 --- a/test/worker_pool_SUITE.erl +++ b/test/worker_pool_SUITE.erl @@ -142,7 +142,7 @@ cancel_timeout(_) -> reuse), timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) after 0 -> ok end. @@ -179,7 +179,7 @@ cancel_timeout_by_setting(_) -> reuse), timer:sleep(1000), - receive {hello, Worker, Test} -> exit(timeout_is_not_canceleld) + receive {hello, Worker, Test} -> exit(timeout_is_not_cancelled) after 0 -> ok end, |
