diff options
31 files changed, 1668 insertions, 339 deletions
@@ -268,6 +268,7 @@ web-manpages: $(WEB_MANPAGES) gsub(/<h1/, "<h2", line); \ gsub(/<\/h1>/, "</h2>", line); \ gsub(/class="D1"/, "class=\"D1 sourcecode bash hljs\"", line); \ + gsub(/class="Bd Bd-indent"/, "class=\"Bd Bd-indent sourcecode bash hljs\"", line); \ print line; \ } } \ ' > "$@" @@ -16,10 +16,12 @@ ## Tutorials & Documentation * [RabbitMQ tutorials](https://rabbitmq.com/getstarted.html) - * [Documentation guides](https://rabbitmq.com/documentation.html) - * [Documentation Source Code](https://github.com/rabbitmq/rabbitmq-website/) + * [All documentation guides](https://rabbitmq.com/documentation.html) + * [Documentation source](https://github.com/rabbitmq/rabbitmq-website/) * [Client libraries and tools](https://rabbitmq.com/devtools.html) - * [Tutorials Source Code](https://github.com/rabbitmq/rabbitmq-tutorials/) + * [Monitoring guide](https://rabbitmq.com/monitoring.html) + * [Production checklist](https://rabbitmq.com/production-checklist.html) + * [Runnable tutorials](https://github.com/rabbitmq/rabbitmq-tutorials/) ## Getting Help @@ -33,6 +35,8 @@ See [CONTRIBUTING.md](./CONTRIBUTING.md) and our [development process overview](https://rabbitmq.com/github.html). +Questions about contributing, internals and so on are very welcome on the [mailing list](https://groups.google.com/forum/#!forum/rabbitmq-users). + ## Licensing diff --git a/docs/README.md b/docs/README.md index 74bce06870..7efad23cf1 100644 --- a/docs/README.md +++ b/docs/README.md @@ -8,3 +8,20 @@ This directory contains [CLI tool](https://rabbitmq.com/cli.html) man page sourc * A [systemd unit file example](./rabbitmq-server.service.example) Please [see rabbitmq.com](https://rabbitmq.com/documentation.html) for documentation guides. + +## man Pages + +### Source Files + +This directory contains man pages that are are converted to HTML using `mandoc`: + + gmake web-manpages + +The result is then copied to the [website repository](https://github.com/rabbitmq/rabbitmq-website/tree/live/site/man) + +### Contributions + +Since deployed man pages are generated, it is important to keep them in sync with the source. +Accepting community contributions — which will always come as website pull requests — +is fine but the person who merges them is responsible for backporting all changes +to the source pages in this repo. diff --git a/docs/rabbitmq-server.8 b/docs/rabbitmq-server.8 index 84772aa042..4c469d25d0 100644 --- a/docs/rabbitmq-server.8 +++ b/docs/rabbitmq-server.8 @@ -74,7 +74,7 @@ Defaults to 5672. .Sh OPTIONS .\" ------------------------------------------------------------------ .Bl -tag -width Ds -.It Fl -detached +.It Fl detached Start the server process in the background. Note that this will cause the pid not to be written to the pid file. .Pp diff --git a/docs/rabbitmq.conf.example b/docs/rabbitmq.conf.example index 2a373b2eb3..a62ed38291 100644 --- a/docs/rabbitmq.conf.example +++ b/docs/rabbitmq.conf.example @@ -503,8 +503,17 @@ # Kernel section # ====================================== +## Timeout used to detect peer unavailability, including CLI tools. +## Related doc guide: https://www.rabbitmq.com/nettick.html. +## # net_ticktime = 60 +## Inter-node communication port range. +## Related doc guide: https://www.rabbitmq.com/networking.html#epmd-inet-dist-port-range. +## +# inet_dist_listen_min = 25672 +# inet_dist_listen_max = 25692 + ## ---------------------------------------------------------------------------- ## RabbitMQ Management Plugin ## diff --git a/docs/rabbitmqctl.8 b/docs/rabbitmqctl.8 index c99170d175..524da897d1 100644 --- a/docs/rabbitmqctl.8 +++ b/docs/rabbitmqctl.8 @@ -73,6 +73,11 @@ for details of configuring the RabbitMQ broker. .It Fl q , -quiet Quiet output mode is selected. Informational messages are suppressed when quiet mode is in effect. +.It Fl s , -silent +Silent output mode is selected. +Informational messages and table headers are suppressed when silent mode is in effect. +.It Fl -no-table-headers +Do not output headers for tabular data. .It Fl -dry-run Do not run the command. Only print information message. diff --git a/priv/schema/rabbit.schema b/priv/schema/rabbit.schema index 9cee795296..5c6078a413 100644 --- a/priv/schema/rabbit.schema +++ b/priv/schema/rabbit.schema @@ -1342,6 +1342,16 @@ end}. {validators, ["non_zero_positive_integer"]} ]}. +{mapping, "inet_dist_listen_min", "kernel.inet_dist_listen_min",[ + {datatype, [integer]}, + {validators, ["non_zero_positive_integer"]} +]}. + +{mapping, "inet_dist_listen_max", "kernel.inet_dist_listen_max",[ + {datatype, [integer]}, + {validators, ["non_zero_positive_integer"]} +]}. + % =============================== % Validators % =============================== diff --git a/rabbitmq-components.mk b/rabbitmq-components.mk index b440169d0d..1bec3c0942 100644 --- a/rabbitmq-components.mk +++ b/rabbitmq-components.mk @@ -50,7 +50,6 @@ dep_rabbitmq_auth_backend_ldap = git_rmq rabbitmq-auth-backend-ldap $(cur dep_rabbitmq_auth_mechanism_ssl = git_rmq rabbitmq-auth-mechanism-ssl $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_aws = git_rmq rabbitmq-aws $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_boot_steps_visualiser = git_rmq rabbitmq-boot-steps-visualiser $(current_rmq_ref) $(base_rmq_ref) master -dep_rabbitmq_clusterer = git_rmq rabbitmq-clusterer $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_cli = git_rmq rabbitmq-cli $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_codegen = git_rmq rabbitmq-codegen $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_consistent_hash_exchange = git_rmq rabbitmq-consistent-hash-exchange $(current_rmq_ref) $(base_rmq_ref) master @@ -70,7 +69,6 @@ dep_rabbitmq_management = git_rmq rabbitmq-management $(current_rm dep_rabbitmq_management_agent = git_rmq rabbitmq-management-agent $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_management_exchange = git_rmq rabbitmq-management-exchange $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_management_themes = git_rmq rabbitmq-management-themes $(current_rmq_ref) $(base_rmq_ref) master -dep_rabbitmq_management_visualiser = git_rmq rabbitmq-management-visualiser $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_message_timestamp = git_rmq rabbitmq-message-timestamp $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_metronome = git_rmq rabbitmq-metronome $(current_rmq_ref) $(base_rmq_ref) master dep_rabbitmq_mqtt = git_rmq rabbitmq-mqtt $(current_rmq_ref) $(base_rmq_ref) master @@ -110,17 +108,14 @@ dep_rabbitmq_public_umbrella = git_rmq rabbitmq-public-umbrella $(curre # all projects use the same versions. It avoids conflicts and makes it # possible to work with rabbitmq-public-umbrella. -dep_cowboy = hex 2.4.0 -dep_cowlib = hex 2.3.0 +dep_cowboy = hex 2.6.1 +dep_cowlib = hex 2.7.0 dep_jsx = hex 2.9.0 dep_lager = hex 3.6.5 dep_ra = git https://github.com/rabbitmq/ra.git master -dep_ranch = hex 1.6.2 -dep_ranch_proxy_protocol = hex 2.1.1 +dep_ranch = hex 1.7.1 dep_recon = hex 2.3.6 -dep_sockjs = git https://github.com/rabbitmq/sockjs-erlang.git 405990ea62353d98d36dbf5e1e64942d9b0a1daf - RABBITMQ_COMPONENTS = amqp_client \ amqp10_common \ amqp10_client \ @@ -134,7 +129,6 @@ RABBITMQ_COMPONENTS = amqp_client \ rabbitmq_auth_mechanism_ssl \ rabbitmq_aws \ rabbitmq_boot_steps_visualiser \ - rabbitmq_clusterer \ rabbitmq_cli \ rabbitmq_codegen \ rabbitmq_consistent_hash_exchange \ @@ -154,7 +148,6 @@ RABBITMQ_COMPONENTS = amqp_client \ rabbitmq_management_agent \ rabbitmq_management_exchange \ rabbitmq_management_themes \ - rabbitmq_management_visualiser \ rabbitmq_message_timestamp \ rabbitmq_metronome \ rabbitmq_mqtt \ diff --git a/scripts/rabbitmq-server b/scripts/rabbitmq-server index d66e2517c6..f0c4030529 100755 --- a/scripts/rabbitmq-server +++ b/scripts/rabbitmq-server @@ -275,17 +275,17 @@ ensure_thread_pool_size() { } start_rabbitmq_server() { - # "-pa ${RABBITMQ_SERVER_CODE_PATH}" should be the very first - # command-line argument. In case of using cached HiPE-compilation, - # this will allow for compiled versions of erlang built-in modules - # (e.g. lists) to be loaded. + # The arguments to -pa are in this order because they are *pre*-pended + # to the code path. Since we want RABBITMQ_SERVER_CODE_PATH to precede + # RABBITMQ_EBIN_ROOT, it must come as the second argument here. + # https://github.com/rabbitmq/rabbitmq-server/issues/1777 ensure_thread_pool_size check_start_params && RABBITMQ_CONFIG_FILE=$RABBITMQ_CONFIG_FILE \ ERL_MAX_ETS_TABLES=$ERL_MAX_ETS_TABLES \ ERL_CRASH_DUMP=$ERL_CRASH_DUMP \ exec ${ERL_DIR}erl \ - -pa ${RABBITMQ_SERVER_CODE_PATH} ${RABBITMQ_EBIN_ROOT} \ + -pa "$RABBITMQ_EBIN_ROOT" "$RABBITMQ_SERVER_CODE_PATH" \ ${RABBITMQ_START_RABBIT} \ ${RABBITMQ_NAME_TYPE} ${RABBITMQ_NODENAME} \ -boot "${SASL_BOOT_FILE}" \ diff --git a/src/lqueue.erl b/src/lqueue.erl index 0652061075..1abe4e0b82 100644 --- a/src/lqueue.erl +++ b/src/lqueue.erl @@ -21,7 +21,7 @@ %% is an O(1) operation, in contrast with queue:len/1 which is O(n). -export([new/0, is_empty/1, len/1, in/2, in_r/2, out/1, out_r/1, join/2, - foldl/3, foldr/3, from_list/1, to_list/1, peek/1, peek_r/1]). + foldl/3, foldr/3, from_list/1, drop/1, to_list/1, peek/1, peek_r/1]). -define(QUEUE, queue). @@ -32,6 +32,7 @@ -type result() :: 'empty' | {'value', value()}. -spec new() -> ?MODULE(). +-spec drop(?MODULE()) -> ?MODULE(). -spec is_empty(?MODULE()) -> boolean(). -spec len(?MODULE()) -> non_neg_integer(). -spec in(value(), ?MODULE()) -> ?MODULE(). @@ -48,6 +49,8 @@ new() -> {0, ?QUEUE:new()}. +drop({L, Q}) -> {L - 1, ?QUEUE:drop(Q)}. + is_empty({0, _Q}) -> true; is_empty(_) -> false. @@ -81,7 +84,8 @@ foldr(Fun, Init, Q) -> {{value, V}, Q1} -> foldr(Fun, Fun(V, Init), Q1) end. -len({L, _Q}) -> L. +len({L, _}) -> L. + peek({ 0, _Q}) -> empty; peek({_L, Q}) -> ?QUEUE:peek(Q). diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl index 64cb3cf790..0f7d569a00 100644 --- a/src/rabbit_amqqueue.erl +++ b/src/rabbit_amqqueue.erl @@ -44,6 +44,7 @@ -export([list_local_followers/0]). -export([ensure_rabbit_queue_record_is_initialized/1]). -export([format/1]). +-export([delete_immediately_by_resource/1]). -export([pid_of/1, pid_of/2]). -export([mark_local_durable_queues_stopped/1]). @@ -266,6 +267,13 @@ filter_per_type(Queues) -> filter_pid_per_type(QPids) -> lists:partition(fun(QPid) -> ?IS_CLASSIC(QPid) end, QPids). +filter_resource_per_type(Resources) -> + Queues = [begin + {ok, #amqqueue{pid = QPid}} = lookup(Resource), + {Resource, QPid} + end || Resource <- Resources], + lists:partition(fun({_Resource, QPid}) -> ?IS_CLASSIC(QPid) end, Queues). + stop(VHost) -> %% Classic queues ok = rabbit_amqqueue_sup_sup:stop_for_vhost(VHost), @@ -912,8 +920,11 @@ list_local(VHostPath) -> [ Q || #amqqueue{state = State, pid = QPid} = Q <- list(VHostPath), State =/= crashed, is_local_to_node(QPid, node()) ]. -notify_policy_changed(#amqqueue{pid = QPid}) -> - gen_server2:cast(QPid, policy_changed). +notify_policy_changed(#amqqueue{pid = QPid}) when ?IS_CLASSIC(QPid) -> + gen_server2:cast(QPid, policy_changed); +notify_policy_changed(#amqqueue{pid = QPid, + name = QName}) when ?IS_QUORUM(QPid) -> + rabbit_quorum_queue:policy_changed(QName, QPid). consumers(#amqqueue{ pid = QPid }) -> delegate:invoke(QPid, {gen_server2, call, [consumers, infinity]}). @@ -961,7 +972,16 @@ delete_exclusive(QPids, ConnId) -> delete_immediately(QPids) -> {Classic, Quorum} = filter_pid_per_type(QPids), [gen_server2:cast(QPid, delete_immediately) || QPid <- Classic], - [rabbit_quorum_queue:delete_immediately(QPid) || QPid <- Quorum], + case Quorum of + [] -> ok; + _ -> {error, cannot_delete_quorum_queues, Quorum} + end. + +delete_immediately_by_resource(Resources) -> + {Classic, Quorum} = filter_resource_per_type(Resources), + [gen_server2:cast(QPid, delete_immediately) || {_, QPid} <- Classic], + [rabbit_quorum_queue:delete_immediately(Resource, QPid) + || {Resource, QPid} <- Quorum], ok. delete(#amqqueue{ type = quorum} = Q, @@ -1098,7 +1118,8 @@ notify_down_all(QPids, ChPid, Timeout) -> Error -> {error, Error} end. -activate_limit_all(QPids, ChPid) -> +activate_limit_all(QRefs, ChPid) -> + QPids = [P || P <- QRefs, ?IS_CLASSIC(P)], delegate:invoke_no_result(QPids, {gen_server2, cast, [{activate_limit, ChPid}]}). diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 606e62af11..5f56955ccc 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -669,44 +669,53 @@ attempt_delivery(Delivery = #delivery{sender = SenderPid, State#q{consumers = Consumers})} end. -maybe_deliver_or_enqueue(Delivery, Delivered, State = #q{overflow = Overflow}) -> +maybe_deliver_or_enqueue(Delivery = #delivery{message = Message}, + Delivered, + State = #q{overflow = Overflow, + backing_queue = BQ, + backing_queue_state = BQS}) -> send_mandatory(Delivery), %% must do this before confirms case {will_overflow(Delivery, State), Overflow} of {true, 'reject-publish'} -> %% Drop publish and nack to publisher send_reject_publish(Delivery, Delivered, State); _ -> - %% Enqueue and maybe drop head later - deliver_or_enqueue(Delivery, Delivered, State) + {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), + State1 = State#q{backing_queue_state = BQS1}, + case IsDuplicate of + true -> State1; + {true, drop} -> State1; + %% Drop publish and nack to publisher + {true, reject} -> + send_reject_publish(Delivery, Delivered, State1); + %% Enqueue and maybe drop head later + false -> + deliver_or_enqueue(Delivery, Delivered, State1) + end end. deliver_or_enqueue(Delivery = #delivery{message = Message, sender = SenderPid, flow = Flow}, - Delivered, State = #q{backing_queue = BQ, - backing_queue_state = BQS}) -> + Delivered, + State = #q{backing_queue = BQ}) -> {Confirm, State1} = send_or_record_confirm(Delivery, State), Props = message_properties(Message, Confirm, State1), - {IsDuplicate, BQS1} = BQ:is_duplicate(Message, BQS), - State2 = State1#q{backing_queue_state = BQS1}, - case IsDuplicate orelse attempt_delivery(Delivery, Props, Delivered, - State2) of - true -> + case attempt_delivery(Delivery, Props, Delivered, State1) of + {delivered, State2} -> State2; - {delivered, State3} -> - State3; %% The next one is an optimisation - {undelivered, State3 = #q{ttl = 0, dlx = undefined, - backing_queue_state = BQS2, + {undelivered, State2 = #q{ttl = 0, dlx = undefined, + backing_queue_state = BQS, msg_id_to_channel = MTC}} -> - {BQS3, MTC1} = discard(Delivery, BQ, BQS2, MTC), - State3#q{backing_queue_state = BQS3, msg_id_to_channel = MTC1}; - {undelivered, State3 = #q{backing_queue_state = BQS2}} -> - - BQS3 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS2), - {Dropped, State4 = #q{backing_queue_state = BQS4}} = - maybe_drop_head(State3#q{backing_queue_state = BQS3}), - QLen = BQ:len(BQS4), + {BQS1, MTC1} = discard(Delivery, BQ, BQS, MTC), + State2#q{backing_queue_state = BQS1, msg_id_to_channel = MTC1}; + {undelivered, State2 = #q{backing_queue_state = BQS}} -> + + BQS1 = BQ:publish(Message, Props, Delivered, SenderPid, Flow, BQS), + {Dropped, State3 = #q{backing_queue_state = BQS2}} = + maybe_drop_head(State2#q{backing_queue_state = BQS1}), + QLen = BQ:len(BQS2), %% optimisation: it would be perfectly safe to always %% invoke drop_expired_msgs here, but that is expensive so %% we only do that if a new message that might have an @@ -715,9 +724,9 @@ deliver_or_enqueue(Delivery = #delivery{message = Message, %% has no expiry and becomes the head of the queue then %% the call is unnecessary. case {Dropped, QLen =:= 1, Props#message_properties.expiry} of - {false, false, _} -> State4; - {true, true, undefined} -> State4; - {_, _, _} -> drop_expired_msgs(State4) + {false, false, _} -> State3; + {true, true, undefined} -> State3; + {_, _, _} -> drop_expired_msgs(State3) end end. @@ -1683,7 +1692,3 @@ update_ha_mode(State) -> {ok, Q} = rabbit_amqqueue:lookup(qname(State)), ok = rabbit_mirror_queue_misc:update_mirrors(Q), State. - - - - diff --git a/src/rabbit_basic.erl b/src/rabbit_basic.erl index d474e9cad3..22ceefb85f 100644 --- a/src/rabbit_basic.erl +++ b/src/rabbit_basic.erl @@ -23,6 +23,7 @@ extract_headers/1, extract_timestamp/1, map_headers/2, delivery/4, header_routes/1, parse_expiration/1, header/2, header/3]). -export([build_content/2, from_content/1, msg_size/1, maybe_gc_large_msg/1]). +-export([add_header/4]). %%---------------------------------------------------------------------------- @@ -300,3 +301,12 @@ maybe_gc_large_msg(Content) -> msg_size(Content) -> rabbit_writer:msg_size(Content). + +add_header(Name, Type, Value, #basic_message{content = Content0} = Msg) -> + Content = rabbit_basic:map_headers( + fun(undefined) -> + rabbit_misc:set_table_value([], Name, Type, Value); + (Headers) -> + rabbit_misc:set_table_value(Headers, Name, Type, Value) + end, Content0), + Msg#basic_message{content = Content}. diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 1b74b655f5..f749d9f30e 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -110,7 +110,7 @@ %% when queue.bind's queue field is empty, %% this name will be used instead most_recently_declared_queue, - %% a map of queue pid to queue name + %% a map of queue ref to queue name queue_names, %% queue processes are monitored to update %% queue names @@ -161,6 +161,7 @@ queue_cleanup_timer }). +-define(QUEUE, lqueue). -define(MAX_PERMISSION_CACHE_SIZE, 12). @@ -339,12 +340,29 @@ list_local() -> info_keys() -> ?INFO_KEYS. info(Pid) -> - gen_server2:call(Pid, info, infinity). + {Timeout, Deadline} = get_operation_timeout_and_deadline(), + try + case gen_server2:call(Pid, {info, Deadline}, Timeout) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end + catch + exit:{timeout, _} -> + rabbit_log:error("Timed out getting channel ~p info", [Pid]), + throw(timeout) + end. info(Pid, Items) -> - case gen_server2:call(Pid, {info, Items}, infinity) of - {ok, Res} -> Res; - {error, Error} -> throw(Error) + {Timeout, Deadline} = get_operation_timeout_and_deadline(), + try + case gen_server2:call(Pid, {{info, Items}, Deadline}, Timeout) of + {ok, Res} -> Res; + {error, Error} -> throw(Error) + end + catch + exit:{timeout, _} -> + rabbit_log:error("Timed out getting channel ~p info", [Pid]), + throw(timeout) end. info_all() -> @@ -433,7 +451,7 @@ init([Channel, ReaderPid, WriterPid, ConnPid, ConnName, Protocol, User, VHost, limiter = Limiter, tx = none, next_tag = 1, - unacked_message_q = queue:new(), + unacked_message_q = ?QUEUE:new(), user = User, virtual_host = VHost, most_recently_declared_queue = <<>>, @@ -493,13 +511,20 @@ prioritise_info(Msg, _Len, _State) -> handle_call(flush, _From, State) -> reply(ok, State); -handle_call(info, _From, State) -> - reply(infos(?INFO_KEYS, State), State); +handle_call({info, Deadline}, _From, State) -> + try + reply({ok, infos(?INFO_KEYS, Deadline, State)}, State) + catch + Error -> + reply({error, Error}, State) + end; -handle_call({info, Items}, _From, State) -> +handle_call({{info, Items}, Deadline}, _From, State) -> try - reply({ok, infos(Items, State)}, State) - catch Error -> reply({error, Error}, State) + reply({ok, infos(Items, Deadline, State)}, State) + catch + Error -> + reply({error, Error}, State) end; handle_call(refresh_config, _From, State = #ch{virtual_host = VHost}) -> @@ -649,8 +674,9 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, State = lists:foldl( fun({MsgId, {MsgHeader, Msg}}, Acc) -> IsDelivered = maps:is_key(delivery_count, MsgHeader), + Msg1 = add_delivery_count_header(MsgHeader, Msg), handle_deliver(CTag, AckRequired, - {QName, From, MsgId, IsDelivered, Msg}, + {QName, From, MsgId, IsDelivered, Msg1}, Acc) end, State0#ch{queue_states = maps:put(Name, QState2, QueueStates)}, Msgs), noreply(State); @@ -669,19 +695,20 @@ handle_info({ra_event, {Name, _} = From, _} = Evt, #'basic.credit_drained'{consumer_tag = CTag, credit_drained = Credit}) end, Actions), - noreply_coalesce(confirm(MsgSeqNos, From, State)); + noreply_coalesce(confirm(MsgSeqNos, Name, State)); eol -> - State1 = handle_consuming_queue_down_or_eol(From, State0), - State2 = handle_delivering_queue_down(From, State1), - {MXs, UC1} = dtree:take(From, State2#ch.unconfirmed), + State1 = handle_consuming_queue_down_or_eol(Name, State0), + State2 = handle_delivering_queue_down(Name, State1), + %% TODO: this should use dtree:take/3 + {MXs, UC1} = dtree:take(Name, State2#ch.unconfirmed), State3 = record_confirms(MXs, State1#ch{unconfirmed = UC1}), - case maps:find(From, QNames) of + case maps:find(Name, QNames) of {ok, QName} -> erase_queue_stats(QName); error -> ok end, noreply_coalesce( State3#ch{queue_states = maps:remove(Name, QueueStates), - queue_names = maps:remove(From, QNames)}) + queue_names = maps:remove(Name, QNames)}) end; _ -> %% the assumption here is that the queue state has been cleaned up and @@ -1179,7 +1206,7 @@ handle_method(#'basic.publish'{exchange = ExchangeNameBin, DQ = {Delivery#delivery{flow = Flow}, QNames}, {noreply, case Tx of none -> deliver_to_queues(DQ, State1); - {Msgs, Acks} -> Msgs1 = queue:in(DQ, Msgs), + {Msgs, Acks} -> Msgs1 = ?QUEUE:in(DQ, Msgs), State1#ch{tx = {Msgs1, Acks}} end}; {error, Reason} -> @@ -1334,13 +1361,14 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, return_ok(State, NoWait, OkMsg); {ok, {Q = #amqqueue{pid = QPid}, _CParams}} -> ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), QCons1 = - case maps:find(QPid, QCons) of + case maps:find(QRef, QCons) of error -> QCons; {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), case gb_sets:is_empty(CTags1) of - true -> maps:remove(QPid, QCons); - false -> maps:put(QPid, CTags1, QCons) + true -> maps:remove(QRef, QCons); + false -> maps:put(QRef, CTags1, QCons) end end, NewState = State#ch{consumer_mapping = ConsumerMapping1, @@ -1386,14 +1414,14 @@ handle_method(#'basic.qos'{global = true, handle_method(#'basic.qos'{global = true, prefetch_count = PrefetchCount}, _, State = #ch{limiter = Limiter, unacked_message_q = UAMQ}) -> - %% TODO queue:len(UAMQ) is not strictly right since that counts + %% TODO ?QUEUE:len(UAMQ) is not strictly right since that counts %% unacked messages from basic.get too. Pretty obscure though. Limiter1 = rabbit_limiter:limit_prefetch(Limiter, - PrefetchCount, queue:len(UAMQ)), + PrefetchCount, ?QUEUE:len(UAMQ)), case ((not rabbit_limiter:is_active(Limiter)) andalso rabbit_limiter:is_active(Limiter1)) of true -> rabbit_amqqueue:activate_limit_all( - consumer_queues(State#ch.consumer_mapping), self()); + consumer_queue_refs(State#ch.consumer_mapping), self()); false -> ok end, {reply, #'basic.qos_ok'{}, State#ch{limiter = Limiter1}}; @@ -1402,7 +1430,7 @@ handle_method(#'basic.recover_async'{requeue = true}, _, State = #ch{unacked_message_q = UAMQ, limiter = Limiter, queue_states = QueueStates0}) -> OkFun = fun () -> ok end, - UAMQL = queue:to_list(UAMQ), + UAMQL = ?QUEUE:to_list(UAMQ), QueueStates = foreach_per_queue( fun ({QPid, CTag}, MsgIds, Acc0) -> @@ -1416,7 +1444,7 @@ handle_method(#'basic.recover_async'{requeue = true}, ok = notify_limiter(Limiter, UAMQL), %% No answer required - basic.recover is the newer, synchronous %% variant of this method - {noreply, State#ch{unacked_message_q = queue:new(), + {noreply, State#ch{unacked_message_q = ?QUEUE:new(), queue_states = QueueStates}}; handle_method(#'basic.recover_async'{requeue = false}, _, _State) -> @@ -1525,7 +1553,7 @@ handle_method(#'tx.commit'{}, _, #ch{tx = none}) -> handle_method(#'tx.commit'{}, _, State = #ch{tx = {Msgs, Acks}, limiter = Limiter}) -> - State1 = rabbit_misc:queue_fold(fun deliver_to_queues/2, State, Msgs), + State1 = queue_fold(fun deliver_to_queues/2, State, Msgs), Rev = fun (X) -> lists:reverse(lists:sort(X)) end, State2 = lists:foldl(fun ({ack, A}, Acc) -> ack(Rev(A), Acc); @@ -1540,7 +1568,7 @@ handle_method(#'tx.rollback'{}, _, #ch{tx = none}) -> handle_method(#'tx.rollback'{}, _, State = #ch{unacked_message_q = UAMQ, tx = {_Msgs, Acks}}) -> AcksL = lists:append(lists:reverse([lists:reverse(L) || {_, L} <- Acks])), - UAMQ1 = queue:from_list(lists:usort(AcksL ++ queue:to_list(UAMQ))), + UAMQ1 = ?QUEUE:from_list(lists:usort(AcksL ++ ?QUEUE:to_list(UAMQ))), {reply, #'tx.rollback_ok'{}, State#ch{unacked_message_q = UAMQ1, tx = new_tx()}}; @@ -1639,25 +1667,26 @@ consumer_monitor(ConsumerTag, State = #ch{consumer_mapping = ConsumerMapping, queue_monitors = QMons, queue_consumers = QCons}) -> - {#amqqueue{pid = QPid}, _} = - maps:get(ConsumerTag, ConsumerMapping), - CTags1 = case maps:find(QPid, QCons) of + {#amqqueue{pid = QPid}, _} = maps:get(ConsumerTag, ConsumerMapping), + QRef = qpid_to_ref(QPid), + CTags1 = case maps:find(QRef, QCons) of {ok, CTags} -> gb_sets:insert(ConsumerTag, CTags); error -> gb_sets:singleton(ConsumerTag) end, - QCons1 = maps:put(QPid, CTags1, QCons), - State#ch{queue_monitors = maybe_monitor(QPid, QMons), + QCons1 = maps:put(QRef, CTags1, QCons), + State#ch{queue_monitors = maybe_monitor(QRef, QMons), queue_consumers = QCons1}. track_delivering_queue(NoAck, QPid, QName, State = #ch{queue_names = QNames, queue_monitors = QMons, delivering_queues = DQ}) -> - State#ch{queue_names = maps:put(QPid, QName, QNames), - queue_monitors = maybe_monitor(QPid, QMons), + QRef = qpid_to_ref(QPid), + State#ch{queue_names = maps:put(QRef, QName, QNames), + queue_monitors = maybe_monitor(QRef, QMons), delivering_queues = case NoAck of true -> DQ; - false -> sets:add_element(QPid, DQ) + false -> sets:add_element(QRef, DQ) end}. handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, @@ -1676,16 +1705,16 @@ handle_publishing_queue_down(QPid, Reason, State = #ch{unconfirmed = UC, handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> error(quorum_queues_should_never_be_monitored). -handle_consuming_queue_down_or_eol(QPid, - State = #ch{queue_consumers = QCons, - queue_names = QNames}) -> - ConsumerTags = case maps:find(QPid, QCons) of +handle_consuming_queue_down_or_eol(QRef, + State = #ch{queue_consumers = QCons, + queue_names = QNames}) -> + ConsumerTags = case maps:find(QRef, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, gb_sets:fold( fun (CTag, StateN = #ch{consumer_mapping = CMap}) -> - QName = maps:get(QPid, QNames), + QName = maps:get(QRef, QNames), case queue_down_consumer_action(CTag, CMap) of remove -> cancel_consumer(CTag, QName, StateN); @@ -1697,7 +1726,7 @@ handle_consuming_queue_down_or_eol(QPid, _ -> cancel_consumer(CTag, QName, StateN) end end - end, State#ch{queue_consumers = maps:remove(QPid, QCons)}, ConsumerTags). + end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags). %% [0] There is a slight danger here that if a queue is deleted and %% then recreated again the reconsume will succeed even though it was @@ -1724,8 +1753,8 @@ queue_down_consumer_action(CTag, CMap) -> _ -> {recover, ConsumeSpec} end. -handle_delivering_queue_down(QPid, State = #ch{delivering_queues = DQ}) -> - State#ch{delivering_queues = sets:del_element(QPid, DQ)}. +handle_delivering_queue_down(QRef, State = #ch{delivering_queues = DQ}) -> + State#ch{delivering_queues = sets:del_element(QRef, DQ)}. binding_action(Fun, SourceNameBin0, DestinationType, DestinationNameBin0, RoutingKey, Arguments, VHostPath, ConnPid, @@ -1831,28 +1860,28 @@ record_sent(Type, Tag, AckRequired, end, rabbit_trace:tap_out(Msg, ConnName, ChannelNum, Username, TraceState), UAMQ1 = case AckRequired of - true -> queue:in({DeliveryTag, Tag, {QPid, MsgId}}, - UAMQ); + true -> ?QUEUE:in({DeliveryTag, Tag, {QPid, MsgId}}, + UAMQ); false -> UAMQ end, State#ch{unacked_message_q = UAMQ1, next_tag = DeliveryTag + 1}. %% NB: returns acks in youngest-first order collect_acks(Q, 0, true) -> - {lists:reverse(queue:to_list(Q)), queue:new()}; + {lists:reverse(?QUEUE:to_list(Q)), ?QUEUE:new()}; collect_acks(Q, DeliveryTag, Multiple) -> collect_acks([], [], Q, DeliveryTag, Multiple). collect_acks(ToAcc, PrefixAcc, Q, DeliveryTag, Multiple) -> - case queue:out(Q) of + case ?QUEUE:out(Q) of {{value, UnackedMsg = {CurrentDeliveryTag, _ConsumerTag, _Msg}}, QTail} -> if CurrentDeliveryTag == DeliveryTag -> {[UnackedMsg | ToAcc], case PrefixAcc of [] -> QTail; - _ -> queue:join( - queue:from_list(lists:reverse(PrefixAcc)), + _ -> ?QUEUE:join( + ?QUEUE:from_list(lists:reverse(PrefixAcc)), QTail) end}; Multiple -> @@ -1880,7 +1909,7 @@ ack(Acked, State = #ch{queue_names = QNames, State#ch{queue_states = QueueStates}. incr_queue_stats(QPid, QNames, MsgIds, State) -> - case maps:find(QPid, QNames) of + case maps:find(qpid_to_ref(QPid), QNames) of {ok, QName} -> Count = length(MsgIds), ?INCR_STATS(queue_stats, QName, Count, ack, State); error -> ok @@ -1898,16 +1927,16 @@ incr_queue_stats(QPid, QNames, MsgIds, State) -> %% (reject w requeue), 'false' (reject w/o requeue). The msg ids, as %% well as the list overall, are in "most-recent (generally youngest) %% ack first" order. -new_tx() -> {queue:new(), []}. +new_tx() -> {?QUEUE:new(), []}. notify_queues(State = #ch{state = closing}) -> {ok, State}; notify_queues(State = #ch{consumer_mapping = Consumers, delivering_queues = DQ }) -> - QPids0 = sets:to_list( - sets:union(sets:from_list(consumer_queues(Consumers)), DQ)), + QRefs0 = sets:to_list( + sets:union(sets:from_list(consumer_queue_refs(Consumers)), DQ)), %% filter to only include pids to avoid trying to notify quorum queues - QPids = [P || P <- QPids0, ?IS_CLASSIC(P)], + QPids = [P || P <- QRefs0, ?IS_CLASSIC(P)], Timeout = get_operation_timeout(), {rabbit_amqqueue:notify_down_all(QPids, self(), Timeout), State#ch{state = closing}}. @@ -1923,8 +1952,8 @@ foreach_per_queue(F, UAL, Acc) -> end, gb_trees:empty(), UAL), rabbit_misc:gb_trees_fold(fun (Key, Val, Acc0) -> F(Key, Val, Acc0) end, Acc, T). -consumer_queues(Consumers) -> - lists:usort([QPid || {_Key, {#amqqueue{pid = QPid}, _CParams}} +consumer_queue_refs(Consumers) -> + lists:usort([qpid_to_ref(QPid) || {_Key, {#amqqueue{pid = QPid}, _CParams}} <- maps:to_list(Consumers)]). %% tell the limiter about the number of acks that have been received @@ -1967,7 +1996,7 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ Qs = rabbit_amqqueue:lookup(DelQNames), {DeliveredQPids, DeliveredQQPids, QueueStates} = rabbit_amqqueue:deliver(Qs, Delivery, QueueStates0), - AllDeliveredQPids = DeliveredQPids ++ DeliveredQQPids, + AllDeliveredQRefs = DeliveredQPids ++ [N || {N, _} <- DeliveredQQPids], %% The maybe_monitor_all/2 monitors all queues to which we %% delivered. But we want to monitor even queues we didn't deliver %% to, since we need their 'DOWN' messages to clean @@ -1981,49 +2010,50 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ {QNames1, QMons1} = lists:foldl(fun (#amqqueue{pid = QPid, name = QName}, {QNames0, QMons0}) -> - {case maps:is_key(QPid, QNames0) of + QRef = qpid_to_ref(QPid), + {case maps:is_key(QRef, QNames0) of true -> QNames0; - false -> maps:put(QPid, QName, QNames0) + false -> maps:put(QRef, QName, QNames0) end, maybe_monitor(QPid, QMons0)} end, {QNames, maybe_monitor_all(DeliveredQPids, QMons)}, Qs), State1 = State#ch{queue_names = QNames1, queue_monitors = QMons1}, %% NB: the order here is important since basic.returns must be %% sent before confirms. - State2 = process_routing_mandatory(Mandatory, AllDeliveredQPids, MsgSeqNo, + State2 = process_routing_mandatory(Mandatory, AllDeliveredQRefs , MsgSeqNo, Message, State1), - State3 = process_routing_confirm( Confirm, AllDeliveredQPids, MsgSeqNo, - XName, State2), + State3 = process_routing_confirm(Confirm, AllDeliveredQRefs , MsgSeqNo, + XName, State2), case rabbit_event:stats_level(State3, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, {QName, XName}, 1, publish) || - QPid <- AllDeliveredQPids, - {ok, QName} <- [maps:find(QPid, QNames1)]]; + QRef <- AllDeliveredQRefs, + {ok, QName} <- [maps:find(QRef, QNames1)]]; _ -> ok end, State3#ch{queue_states = QueueStates}. -process_routing_mandatory(false, _, _MsgSeqNo, _Msg, State) -> +process_routing_mandatory(false, _, _, _, State) -> State; -process_routing_mandatory(true, [], _MsgSeqNo, Msg, State) -> +process_routing_mandatory(true, [], _, Msg, State) -> ok = basic_return(Msg, State, no_route), State; -process_routing_mandatory(true, QPids, MsgSeqNo, Msg, State) -> - State#ch{mandatory = dtree:insert(MsgSeqNo, QPids, Msg, +process_routing_mandatory(true, QRefs, MsgSeqNo, Msg, State) -> + State#ch{mandatory = dtree:insert(MsgSeqNo, QRefs, Msg, State#ch.mandatory)}. -process_routing_confirm(false, _, _MsgSeqNo, _XName, State) -> +process_routing_confirm(false, _, _, _, State) -> State; -process_routing_confirm(true, [], MsgSeqNo, XName, State) -> +process_routing_confirm(true, [], MsgSeqNo, XName, State) -> record_confirms([{MsgSeqNo, XName}], State); -process_routing_confirm(true, QPids, MsgSeqNo, XName, State) -> - State#ch{unconfirmed = dtree:insert(MsgSeqNo, QPids, XName, +process_routing_confirm(true, QRefs, MsgSeqNo, XName, State) -> + State#ch{unconfirmed = dtree:insert(MsgSeqNo, QRefs, XName, State#ch.unconfirmed)}. -confirm(MsgSeqNos, QPid, State = #ch{unconfirmed = UC}) -> - {MXs, UC1} = dtree:take(MsgSeqNos, QPid, UC), +confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> + {MXs, UC1} = dtree:take(MsgSeqNos, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. record_confirms(MXs, State#ch{unconfirmed = UC1}). @@ -2129,6 +2159,17 @@ complete_tx(State = #ch{tx = failed}) -> infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. +infos(Items, Deadline, State) -> + [begin + Now = now_millis(), + if + Now > Deadline -> + throw(timeout); + true -> + {Item, i(Item, State)} + end + end || Item <- Items]. + i(pid, _) -> self(); i(connection, #ch{conn_pid = ConnPid}) -> ConnPid; i(number, #ch{channel = Channel}) -> Channel; @@ -2140,8 +2181,8 @@ i(confirm, #ch{confirm_enabled = CE}) -> CE; i(name, State) -> name(State); i(consumer_count, #ch{consumer_mapping = CM}) -> maps:size(CM); i(messages_unconfirmed, #ch{unconfirmed = UC}) -> dtree:size(UC); -i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> queue:len(UAMQ); -i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> queue:len(Msgs); +i(messages_unacknowledged, #ch{unacked_message_q = UAMQ}) -> ?QUEUE:len(UAMQ); +i(messages_uncommitted, #ch{tx = {Msgs, _Acks}}) -> ?QUEUE:len(Msgs); i(messages_uncommitted, #ch{}) -> 0; i(acks_uncommitted, #ch{tx = {_Msgs, Acks}}) -> ack_len(Acks); i(acks_uncommitted, #ch{}) -> 0; @@ -2488,3 +2529,28 @@ maybe_monitor(_, QMons) -> maybe_monitor_all([], S) -> S; %% optimisation maybe_monitor_all([Item], S) -> maybe_monitor(Item, S); %% optimisation maybe_monitor_all(Items, S) -> lists:foldl(fun maybe_monitor/2, S, Items). + +add_delivery_count_header(MsgHeader, Msg) -> + Count = maps:get(delivery_count, MsgHeader, 0), + rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg). + +qpid_to_ref(Pid) when is_pid(Pid) -> Pid; +qpid_to_ref({Name, _}) -> Name; +%% assume it already is a ref +qpid_to_ref(Ref) -> Ref. + +now_millis() -> + erlang:monotonic_time(millisecond). + +get_operation_timeout_and_deadline() -> + % NB: can't use get_operation_timeout because + % this code may not be running via the channel Pid + Timeout = ?CHANNEL_OPERATION_TIMEOUT, + Deadline = now_millis() + Timeout, + {Timeout, Deadline}. + +queue_fold(Fun, Init, Q) -> + case ?QUEUE:out(Q) of + {empty, _Q} -> Init; + {{value, V}, Q1} -> queue_fold(Fun, Fun(V, Init), Q1) + end. diff --git a/src/rabbit_connection_sup.erl b/src/rabbit_connection_sup.erl index 4dcfa8dc8a..7ec5262448 100644 --- a/src/rabbit_connection_sup.erl +++ b/src/rabbit_connection_sup.erl @@ -42,7 +42,7 @@ %%-------------------------------------------------------------------------- -start_link(Ref, Sock, _Transport, _Opts) -> +start_link(Ref, _Sock, _Transport, _Opts) -> {ok, SupPid} = supervisor2:start_link(?MODULE, []), %% We need to get channels in the hierarchy here so they get shut %% down after the reader, so the reader gets a chance to terminate @@ -62,7 +62,7 @@ start_link(Ref, Sock, _Transport, _Opts) -> {ok, ReaderPid} = supervisor2:start_child( SupPid, - {reader, {rabbit_reader, start_link, [HelperSup, Ref, Sock]}, + {reader, {rabbit_reader, start_link, [HelperSup, Ref]}, intrinsic, ?WORKER_WAIT, worker, [rabbit_reader]}), {ok, SupPid, ReaderPid}. diff --git a/src/rabbit_fifo.erl b/src/rabbit_fifo.erl index 79d4a3effc..00d0db0b8a 100644 --- a/src/rabbit_fifo.erl +++ b/src/rabbit_fifo.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + -module(rabbit_fifo). -behaviour(ra_machine). @@ -106,7 +122,7 @@ -type applied_mfa() :: {module(), atom(), list()}. % represents a partially applied module call --define(SHADOW_COPY_INTERVAL, 4096). +-define(SHADOW_COPY_INTERVAL, 4096 * 4). -define(USE_AVG_HALF_LIFE, 10000.0). -record(consumer, @@ -147,7 +163,7 @@ next_msg_num = 1 :: msg_in_id(), % list of returned msg_in_ids - when checking out it picks from % this list first before taking low_msg_num - returns = queue:new() :: queue:queue(msg_in_id()), + returns = lqueue:new() :: lqueue:queue(msg_in_id() | '$prefix_msg'), % a counter of enqueues - used to trigger shadow copy points enqueue_count = 0 :: non_neg_integer(), % a map containing all the live processes that have ever enqueued @@ -180,7 +196,8 @@ %% it instead takes messages from the `messages' map. %% This is done so that consumers are still served in a deterministic %% order on recovery. - prefix_msg_count = 0 :: non_neg_integer() + prefix_msg_counts = {0, 0} :: {Return :: non_neg_integer(), + PrefixMsgs :: non_neg_integer()} }). -opaque state() :: #state{}. @@ -207,19 +224,19 @@ -spec init(config()) -> {state(), ra_machine:effects()}. init(#{name := Name} = Conf) -> + update_state(Conf, #state{name = Name}). + +update_state(Conf, State) -> DLH = maps:get(dead_letter_handler, Conf, undefined), CCH = maps:get(cancel_consumer_handler, Conf, undefined), BLH = maps:get(become_leader_handler, Conf, undefined), MH = maps:get(metrics_handler, Conf, undefined), SHI = maps:get(shadow_copy_interval, Conf, ?SHADOW_COPY_INTERVAL), - #state{name = Name, - dead_letter_handler = DLH, - cancel_consumer_handler = CCH, - become_leader_handler = BLH, - metrics_handler = MH, - shadow_copy_interval = SHI}. - - + State#state{dead_letter_handler = DLH, + cancel_consumer_handler = CCH, + become_leader_handler = BLH, + metrics_handler = MH, + shadow_copy_interval = SHI}. % msg_ids are scoped per consumer % ra_indexes holds all raft indexes for enqueues currently on queue @@ -228,9 +245,9 @@ init(#{name := Name} = Conf) -> {state(), ra_machine:effects(), Reply :: term()}. apply(#{index := RaftIdx}, {enqueue, From, Seq, RawMsg}, Effects0, State00) -> case maybe_enqueue(RaftIdx, From, Seq, RawMsg, Effects0, State00) of - {ok, State0, Effects} -> - State = append_to_master_index(RaftIdx, State0), - checkout(State, Effects); + {ok, State0, Effects1} -> + {State, Effects, ok} = checkout(State0, Effects1), + {append_to_master_index(RaftIdx, State), Effects, ok}; {duplicate, State, Effects} -> {State, Effects, ok} end; @@ -263,7 +280,7 @@ apply(_, {return, MsgIds, ConsumerId}, Effects0, #{ConsumerId := Con0 = #consumer{checked_out = Checked0}} -> Checked = maps:without(MsgIds, Checked0), Returned = maps:with(MsgIds, Checked0), - MsgNumMsgs = [M || M <- maps:values(Returned)], + MsgNumMsgs = maps:values(Returned), return(ConsumerId, MsgNumMsgs, Con0, Checked, Effects0, State); _ -> {State, Effects0, ok} @@ -313,7 +330,8 @@ apply(_, {credit, NewCredit, RemoteDelCnt, Drain, ConsumerId}, Effects0, end; apply(_, {checkout, {dequeue, _}, {_Tag, _Pid}}, Effects0, #state{messages = M, - prefix_msg_count = 0} = State0) when map_size(M) == 0 -> + prefix_msg_counts = {0, 0}} = State0) when map_size(M) == 0 -> + %% FIXME: also check if there are returned messages %% TODO do we need metric visibility of empty get requests? {State0, Effects0, {dequeue, empty}}; apply(Meta, {checkout, {dequeue, settled}, ConsumerId}, @@ -357,7 +375,7 @@ apply(#{index := RaftIdx}, purge, Effects0, {StateAcc0, EffectsAcc0, ok}) -> MsgRaftIdxs = [RIdx || {_MsgInId, {RIdx, _}} <- maps:values(Checked0)], - complete(ConsumerId, MsgRaftIdxs, C, + complete(ConsumerId, MsgRaftIdxs, maps:size(Checked0), C, #{}, EffectsAcc0, StateAcc0) end, {State0, Effects0, ok}, Cons0), {State, Effects, _} = @@ -365,7 +383,7 @@ apply(#{index := RaftIdx}, purge, Effects0, RaftIdx, Indexes, State1#state{ra_indexes = rabbit_fifo_index:empty(), messages = #{}, - returns = queue:new(), + returns = lqueue:new(), low_msg_num = undefined}, Effects1), {State, [garbage_collection | Effects], {purge, Total}}; apply(_, {down, ConsumerPid, noconnection}, @@ -374,10 +392,16 @@ apply(_, {down, ConsumerPid, noconnection}, Node = node(ConsumerPid), % mark all consumers and enqueuers as suspect % and monitor the node - Cons = maps:map(fun({_, P}, C) when node(P) =:= Node -> - C#consumer{suspected_down = true}; - (_, C) -> C - end, Cons0), + {Cons, State} = maps:fold(fun({_, P} = K, #consumer{checked_out = Checked0} = C, + {Co, St0}) when node(P) =:= Node -> + St = return_all(St0, Checked0), + {maps:put(K, C#consumer{suspected_down = true, + checked_out = #{}}, + Co), + St}; + (K, C, {Co, St}) -> + {maps:put(K, C, Co), St} + end, {#{}, State0}, Cons0), Enqs = maps:map(fun(P, E) when node(P) =:= Node -> E#enqueuer{suspected_down = true}; (_, E) -> E @@ -388,7 +412,7 @@ apply(_, {down, ConsumerPid, noconnection}, _ -> [{monitor, node, Node} | Effects0] end, - {State0#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; + {State#state{consumers = Cons, enqueuers = Enqs}, Effects, ok}; apply(_, {down, Pid, _Info}, Effects0, #state{consumers = Cons0, enqueuers = Enqs0} = State0) -> @@ -411,7 +435,8 @@ apply(_, {down, Pid, _Info}, Effects0, checkout(State2, Effects1); apply(_, {nodeup, Node}, Effects0, #state{consumers = Cons0, - enqueuers = Enqs0} = State0) -> + enqueuers = Enqs0, + service_queue = SQ0} = State0) -> %% A node we are monitoring has come back. %% If we have suspected any processes of being %% down we should now re-issue the monitors for them to detect if they're @@ -427,26 +452,48 @@ apply(_, {nodeup, Node}, Effects0, (_, _, Acc) -> Acc end, [], Enqs0), Monitors = [{monitor, process, P} || P <- Cons ++ Enqs], + Enqs1 = maps:map(fun(P, E) when node(P) =:= Node -> + E#enqueuer{suspected_down = false}; + (_, E) -> E + end, Enqs0), + {Cons1, SQ, Effects} = + maps:fold(fun({_, P} = ConsumerId, C, {CAcc, SQAcc, EAcc}) + when node(P) =:= Node -> + update_or_remove_sub( + ConsumerId, C#consumer{suspected_down = false}, + CAcc, SQAcc, EAcc); + (_, _, Acc) -> + Acc + end, {Cons0, SQ0, Effects0}, Cons0), % TODO: avoid list concat - {State0, Monitors ++ Effects0, ok}; + checkout(State0#state{consumers = Cons1, enqueuers = Enqs1, + service_queue = SQ}, Monitors ++ Effects); apply(_, {nodedown, _Node}, Effects, State) -> - {State, Effects, ok}. + {State, Effects, ok}; +apply(_, {update_state, Conf}, Effects, State) -> + {update_state(Conf, State), Effects, ok}. -spec state_enter(ra_server:ra_state(), state()) -> ra_machine:effects(). -state_enter(leader, #state{consumers = Custs, +state_enter(leader, #state{consumers = Cons, enqueuers = Enqs, name = Name, + prefix_msg_counts = {0, 0}, become_leader_handler = BLH}) -> % return effects to monitor all current consumers and enqueuers - ConMons = [{monitor, process, P} || {_, P} <- maps:keys(Custs)], - EnqMons = [{monitor, process, P} || P <- maps:keys(Enqs)], - Effects = ConMons ++ EnqMons, + Pids = lists:usort(maps:keys(Enqs) ++ [P || {_, P} <- maps:keys(Cons)]), + Mons = [{monitor, process, P} || P <- Pids], + Nots = [{send_msg, P, leader_change, ra_event} || P <- Pids], + Effects = Mons ++ Nots, case BLH of undefined -> Effects; {Mod, Fun, Args} -> [{mod_call, Mod, Fun, Args ++ [Name]} | Effects] end; +state_enter(recovered, #state{prefix_msg_counts = PrefixMsgCounts}) + when PrefixMsgCounts =/= {0, 0} -> + %% TODO: remove assertion? + exit({rabbit_fifo, unexpected_prefix_msg_counts, PrefixMsgCounts}); state_enter(eol, #state{enqueuers = Enqs, consumers = Custs0}) -> Custs = maps:fold(fun({_, P}, V, S) -> S#{P => V} end, #{}, Custs0), [{send_msg, P, eol, ra_event} || P <- maps:keys(maps:merge(Enqs, Custs))]; @@ -583,9 +630,7 @@ cancel_consumer(ConsumerId, {Effects0, #state{consumers = C0, name = Name} = S0}) -> case maps:take(ConsumerId, C0) of {#consumer{checked_out = Checked0}, Cons} -> - S = maps:fold(fun (_, {MsgNum, Msg}, S) -> - return_one(MsgNum, Msg, S) - end, S0, Checked0), + S = return_all(S0, Checked0), Effects = cancel_consumer_effects(ConsumerId, Name, S, Effects0), case maps:size(Cons) of 0 -> @@ -678,8 +723,8 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, end, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), - State1 = lists:foldl(fun('$prefix_msg', #state{prefix_msg_count = MsgCount} = S0) -> - S0#state{prefix_msg_count = MsgCount + 1}; + State1 = lists:foldl(fun('$prefix_msg' = Msg, S0) -> + return_one(0, Msg, S0); ({MsgNum, Msg}, S0) -> return_one(MsgNum, Msg, S0) end, State0, MsgNumMsgs), @@ -688,14 +733,14 @@ return(ConsumerId, MsgNumMsgs, #consumer{lifetime = Life} = Con0, Checked, Effects). % used to processes messages that are finished -complete(ConsumerId, MsgRaftIdxs, +complete(ConsumerId, MsgRaftIdxs, NumDiscarded, Con0, Checked, Effects0, #state{consumers = Cons0, service_queue = SQ0, ra_indexes = Indexes0} = State0) -> - %% credit_mode = simple_prefetch should automatically top-up credit as messages - %% are simple_prefetch or otherwise returned + %% credit_mode = simple_prefetch should automatically top-up credit + %% as messages are simple_prefetch or otherwise returned Con = Con0#consumer{checked_out = Checked, - credit = increase_credit(Con0, length(MsgRaftIdxs))}, + credit = increase_credit(Con0, NumDiscarded)}, {Cons, SQ, Effects} = update_or_remove_sub(ConsumerId, Con, Cons0, SQ0, Effects0), Indexes = lists:foldl(fun rabbit_fifo_index:delete/2, Indexes0, MsgRaftIdxs), @@ -721,7 +766,10 @@ complete_and_checkout(IncomingRaftIdx, MsgIds, ConsumerId, Checked = maps:without(MsgIds, Checked0), Discarded = maps:with(MsgIds, Checked0), MsgRaftIdxs = [RIdx || {_, {RIdx, _}} <- maps:values(Discarded)], + %% need to pass the length of discarded as $prefix_msgs would be filtered + %% by the above list comprehension {State1, Effects1, _} = complete(ConsumerId, MsgRaftIdxs, + maps:size(Discarded), Con0, Checked, Effects0, State0), {State, Effects, _} = checkout(State1, Effects1), % settle metrics are incremented separately @@ -749,6 +797,7 @@ cancel_consumer_effects(Pid, Name, update_smallest_raft_index(IncomingRaftIdx, OldIndexes, #state{ra_indexes = Indexes, + % prefix_msg_count = 0, messages = Messages} = State, Effects) -> case rabbit_fifo_index:size(Indexes) of 0 when map_size(Messages) =:= 0 -> @@ -777,6 +826,9 @@ update_smallest_raft_index(IncomingRaftIdx, OldIndexes, end. % TODO update message then update messages and returns in single operations +return_one(0, '$prefix_msg', + #state{returns = Returns} = State0) -> + State0#state{returns = lqueue:in('$prefix_msg', Returns)}; return_one(MsgNum, {RaftId, {Header0, RawMsg}}, #state{messages = Messages, returns = Returns} = State0) -> @@ -786,8 +838,14 @@ return_one(MsgNum, {RaftId, {Header0, RawMsg}}, Msg = {RaftId, {Header, RawMsg}}, % this should not affect the release cursor in any way State0#state{messages = maps:put(MsgNum, Msg, Messages), - returns = queue:in(MsgNum, Returns)}. + returns = lqueue:in(MsgNum, Returns)}. +return_all(State, Checked) -> + maps:fold(fun (_, '$prefix_msg', S) -> + return_one(0, '$prefix_msg', S); + (_, {MsgNum, Msg}, S) -> + return_one(MsgNum, Msg, S) + end, State, Checked). checkout(State, Effects) -> checkout0(checkout_one(State), Effects, #{}). @@ -813,13 +871,20 @@ append_send_msg_effects(Effects0, AccMap) -> end, Effects0, AccMap), [{aux, active} | Effects]. +next_checkout_message(#state{prefix_msg_counts = {PReturns, P}} = State) + when PReturns > 0 -> + %% there are prefix returns, these should be served first + {'$prefix_msg', State#state{prefix_msg_counts = {PReturns - 1, P}}}; next_checkout_message(#state{returns = Returns, low_msg_num = Low0, + prefix_msg_counts = {R, P}, next_msg_num = NextMsgNum} = State) -> %% use peek rather than out there as the most likely case is an empty %% queue - case queue:peek(Returns) of - empty -> + case lqueue:peek(Returns) of + {value, Next} -> + {Next, State#state{returns = lqueue:drop(Returns)}}; + empty when P == 0 -> case Low0 of undefined -> {undefined, State}; @@ -832,25 +897,32 @@ next_checkout_message(#state{returns = Returns, {Low0, State#state{low_msg_num = Low}} end end; - {value, Next} -> - {Next, State#state{returns = queue:drop(Returns)}} + empty -> + %% There are prefix msgs + {'$prefix_msg', State#state{prefix_msg_counts = {R, P - 1}}} end. -take_next_msg(#state{prefix_msg_count = 0, - messages = Messages0} = State0) -> - {NextMsgInId, State} = next_checkout_message(State0), - %% messages are available - case maps:take(NextMsgInId, Messages0) of - {IdxMsg, Messages} -> - {{NextMsgInId, IdxMsg}, State, Messages, 0}; - error -> - error - end; -take_next_msg(#state{prefix_msg_count = MsgCount, - messages = Messages} = State) -> - %% there is still a prefix message count for the consumer - %% "fake" a '$prefix_msg' message - {'$prefix_msg', State, Messages, MsgCount - 1}. +%% next message is determined as follows: +%% First we check if there are are prefex returns +%% Then we check if there are current returns +%% then we check prefix msgs +%% then we check current messages +%% +%% When we return it is always done to the current return queue +%% for both prefix messages and current messages +take_next_msg(#state{messages = Messages0} = State0) -> + case next_checkout_message(State0) of + {'$prefix_msg', State} -> + {'$prefix_msg', State, Messages0}; + {NextMsgInId, State} -> + %% messages are available + case maps:take(NextMsgInId, Messages0) of + {IdxMsg, Messages} -> + {{NextMsgInId, IdxMsg}, State, Messages}; + error -> + error + end + end. send_msg_effect({CTag, CPid}, Msgs) -> {send_msg, CPid, {delivery, CTag, Msgs}, ra_event}. @@ -861,7 +933,7 @@ checkout_one(#state{service_queue = SQ0, case queue:peek(SQ0) of {value, ConsumerId} -> case take_next_msg(InitState) of - {ConsumerMsg, State0, Messages, PrefMsgC} -> + {ConsumerMsg, State0, Messages} -> SQ1 = queue:drop(SQ0), %% there are consumers waiting to be serviced %% process consumer checkout @@ -871,6 +943,8 @@ checkout_one(#state{service_queue = SQ0, %% can happen when draining %% recurse without consumer on queue checkout_one(InitState#state{service_queue = SQ1}); + {ok, #consumer{suspected_down = true}} -> + checkout_one(InitState#state{service_queue = SQ1}); {ok, #consumer{checked_out = Checked0, next_msg_id = Next, credit = Credit, @@ -885,7 +959,6 @@ checkout_one(#state{service_queue = SQ0, Cons0, SQ1, []), State = State0#state{service_queue = SQ, messages = Messages, - prefix_msg_count = PrefMsgC, consumers = Cons}, Msg = case ConsumerMsg of '$prefix_msg' -> '$prefix_msg'; @@ -976,17 +1049,30 @@ maybe_queue_consumer(ConsumerId, #consumer{credit = Credit}, %% creates a dehydrated version of the current state to be cached and %% potentially used to for a snaphot at a later point -dehydrate_state(#state{messages = Messages0, +dehydrate_state(#state{messages = Messages, consumers = Consumers, - prefix_msg_count = MsgCount} = State) -> + returns = Returns, + prefix_msg_counts = {PrefRetCnt, MsgCount}} = State) -> + %% TODO: optimise to avoid having to iterate the queue to get the number + %% of current returned messages + RetLen = lqueue:len(Returns), % O(1) + CurReturns = length([R || R <- lqueue:to_list(Returns), + R =/= '$prefix_msg']), + PrefixMsgCnt = MsgCount + maps:size(Messages) - CurReturns, State#state{messages = #{}, ra_indexes = rabbit_fifo_index:empty(), low_msg_num = undefined, consumers = maps:map(fun (_, C) -> - C#consumer{checked_out = #{}} + dehydrate_consumer(C) end, Consumers), - returns = queue:new(), - prefix_msg_count = maps:size(Messages0) + MsgCount}. + returns = lqueue:new(), + %% messages include returns + prefix_msg_counts = {RetLen + PrefRetCnt, + PrefixMsgCnt}}. + +dehydrate_consumer(#consumer{checked_out = Checked0} = Con) -> + Checked = maps:map(fun (_, _) -> '$prefix_msg' end, Checked0), + Con#consumer{checked_out = Checked}. -ifdef(TEST). @@ -1289,6 +1375,20 @@ down_with_noconnection_marks_suspect_and_node_is_monitored_test() -> ?ASSERT_EFF({monitor, process, P}, P =:= Self, Effects3), ok. +down_with_noconnection_returns_unack_test() -> + Pid = spawn(fun() -> ok end), + Cid = {<<"down_with_noconnect">>, Pid}, + {State0, _} = enq(1, 1, second, test_init(test)), + ?assertEqual(1, maps:size(State0#state.messages)), + ?assertEqual(0, lqueue:len(State0#state.returns)), + {State1, {_, _}} = deq(2, Cid, unsettled, State0), + ?assertEqual(0, maps:size(State1#state.messages)), + ?assertEqual(0, lqueue:len(State1#state.returns)), + {State2a, _, _} = apply(meta(3), {down, Pid, noconnection}, [], State1), + ?assertEqual(1, maps:size(State2a#state.messages)), + ?assertEqual(1, lqueue:len(State2a#state.returns)), + ok. + down_with_noproc_enqueuer_is_cleaned_up_test() -> State00 = test_init(test), Pid = spawn(fun() -> ok end), @@ -1351,7 +1451,7 @@ tick_test() -> ok. enq_deq_snapshot_recover_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, % OthPid = spawn(fun () -> ok end), % Oth = {<<"oth">>, OthPid}, @@ -1408,20 +1508,49 @@ snapshot_recover_test() -> ], run_snapshot_test(?FUNCTION_NAME, Commands). -enq_deq_return_snapshot_recover_test() -> +enq_deq_return_settle_snapshot_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + {return, [0], Cid}, %% should be re-delivered to Cid + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 + {settle, [1], Cid}, + {settle, [2], Cid} + ], + run_snapshot_test(?FUNCTION_NAME, Commands). + +return_prefix_msg_count_test() -> Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, - OthPid = spawn(fun () -> ok end), - Oth = {<<"oth">>, OthPid}, Commands = [ {enqueue, self(), 1, one}, - {enqueue, self(), 2, two}, - {checkout, {dequeue, unsettled}, Oth}, - {checkout, {dequeue, unsettled}, Cid}, - {settle, [0], Oth}, - {return, [0], Cid}, + {checkout, {auto, 1, simple_prefetch}, Cid}, + {checkout, cancel, Cid}, + {enqueue, self(), 2, two} %% Cid prefix_msg_count: 2 + ], + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, _Effects} = run_log(test_init(?FUNCTION_NAME), Entries), + ?debugFmt("return_prefix_msg_count_test state ~n~p~n", [State]), + ok. + + +return_settle_snapshot_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {enqueue, self(), 1, one}, %% to Cid + {checkout, {auto, 1, simple_prefetch}, Cid}, + {return, [0], Cid}, %% should be re-delivered to Oth + {enqueue, self(), 2, two}, %% Cid prefix_msg_count: 2 + {settle, [1], Cid}, + {return, [2], Cid}, + {settle, [3], Cid}, {enqueue, self(), 3, three}, - purge + purge, + {enqueue, self(), 4, four} ], run_snapshot_test(?FUNCTION_NAME, Commands). @@ -1436,17 +1565,67 @@ enq_check_settle_snapshot_recover_test() -> {settle, [0], Cid}, {enqueue, self(), 3, three}, {settle, [2], Cid} + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). +enq_check_settle_snapshot_purge_test() -> + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, + {enqueue, self(), 2, two}, + {settle, [1], Cid}, + {settle, [0], Cid}, + {enqueue, self(), 3, three}, + purge + ], + % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + run_snapshot_test(?FUNCTION_NAME, Commands). + +enq_check_settle_duplicate_test() -> + %% duplicate settle commands are likely + Tag = atom_to_binary(?FUNCTION_NAME, utf8), + Cid = {Tag, self()}, + Commands = [ + {checkout, {auto, 2, simple_prefetch}, Cid}, + {enqueue, self(), 1, one}, %% 0 + {enqueue, self(), 2, two}, %% 0 + {settle, [0], Cid}, + {settle, [1], Cid}, + {settle, [1], Cid}, + {enqueue, self(), 3, three}, + {settle, [2], Cid} ], % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), run_snapshot_test(?FUNCTION_NAME, Commands). +multi_return_snapshot_test() -> + %% this was discovered using property testing + C1 = {<<>>, c:pid(0,6723,1)}, + C2 = {<<0>>,c:pid(0,6723,1)}, + E = c:pid(0,6720,1), + Commands = [ + {checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg}, + {enqueue,E,2,msg}, + {checkout,cancel,C1}, %% both on returns queue + {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 + {return,[0],C2}, %% E1 in returns, E2 with C2 + {return,[1],C2}, %% E2 in returns E1 with C2 + {settle,[2],C2} %% E2 with C2 + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + + run_snapshot_test(Name, Commands) -> %% create every incremental permuation of the commands lists %% and run the snapshot tests against that [begin - % ?debugFmt("~w running commands ~w~n", [?FUNCTION_NAME, C]), + ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), run_snapshot_test0(Name, C) end || C <- prefixes(Commands, 1, [])]. @@ -1459,6 +1638,7 @@ run_snapshot_test0(Name, Commands) -> Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; (_) -> false end, Entries), + ?debugFmt("running from snapshot: ~b", [SnapIdx]), {S, _} = run_log(SnapState, Filtered), % assert log can be restored from any release cursor index % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", @@ -1474,7 +1654,7 @@ prefixes(Source, N, Acc) -> prefixes(Source, N+1, [X | Acc]). delivery_query_returns_deliveries_test() -> - Tag = <<"release_cursor_snapshot_state_test">>, + Tag = atom_to_binary(?FUNCTION_NAME, utf8), Cid = {Tag, self()}, Commands = [ {checkout, {auto, 5, simple_prefetch}, Cid}, @@ -1514,19 +1694,30 @@ state_enter_test() -> [{mod_call, m, f, [a, the_name]}] = state_enter(leader, S0), ok. -leader_monitors_on_state_enter_test() -> - Cid = {<<"cid">>, self()}, - {State0, [_, _]} = enq(1, 1, first, test_init(test)), - {State1, _} = check_auto(Cid, 2, State0), +state_enter_montors_and_notifications_test() -> + Oth = spawn(fun () -> ok end), + {State0, _} = enq(1, 1, first, test_init(test)), + Cid = {<<"adf">>, self()}, + OthCid = {<<"oth">>, Oth}, + {State1, _} = check(Cid, 2, State0), + {State, _} = check(OthCid, 3, State1), Self = self(), - %% as we have an enqueuer _and_ a consumer we chould - %% get two monitor effects in total, even if they are for the same - %% processs + Effects = state_enter(leader, State), + + %% monitor all enqueuers and consumers [{monitor, process, Self}, - {monitor, process, Self}] = state_enter(leader, State1), + {monitor, process, Oth}] = + lists:filter(fun ({monitor, process, _}) -> true; + (_) -> false + end, Effects), + [{send_msg, Self, leader_change, ra_event}, + {send_msg, Oth, leader_change, ra_event}] = + lists:filter(fun ({send_msg, _, leader_change, ra_event}) -> true; + (_) -> false + end, Effects), + ?ASSERT_EFF({monitor, process, _}, Effects), ok. - purge_test() -> Cid = {<<"purge_test">>, self()}, {State1, _} = enq(1, 1, first, test_init(test)), diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index c087e35fb2..635d85be4a 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -1,3 +1,19 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2007-2017 Pivotal Software, Inc. All rights reserved. +%% + %% @doc Provides an easy to consume API for interacting with the {@link rabbit_fifo.} %% state machine implementation running inside a `ra' raft system. %% @@ -21,7 +37,8 @@ handle_ra_event/3, untracked_enqueue/2, purge/1, - cluster_name/1 + cluster_name/1, + update_machine_state/2 ]). -include_lib("ra/include/ra.hrl"). @@ -375,6 +392,14 @@ purge(Node) -> cluster_name(#state{cluster_name = ClusterName}) -> ClusterName. +update_machine_state(Node, Conf) -> + case ra:process_command(Node, {update_state, Conf}) of + {ok, ok, _} -> + ok; + Err -> + Err + end. + %% @doc Handles incoming `ra_events'. Events carry both internal "bookeeping" %% events emitted by the `ra' leader as well as `rabbit_fifo' emitted events such %% as message deliveries. All ra events need to be handled by {@module} @@ -438,7 +463,8 @@ handle_ra_event(From, {applied, Seqs}, fun (Cid, {Settled, Returns, Discards}, Acc) -> add_command(Cid, settle, Settled, add_command(Cid, return, Returns, - add_command(Cid, discard, Discards, Acc))) + add_command(Cid, discard, + Discards, Acc))) end, [], State1#state.unsent_commands), Node = pick_node(State2), %% send all the settlements and returns @@ -456,10 +482,21 @@ handle_ra_event(From, {applied, Seqs}, end; handle_ra_event(Leader, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_delivery(Leader, Del, State0); +handle_ra_event(Leader, {machine, leader_change}, + #state{leader = Leader} = State) -> + %% leader already known + {internal, [], [], State}; +handle_ra_event(Leader, {machine, leader_change}, State0) -> + %% we need to update leader + %% and resend any pending commands + State = resend_all_pending(State0#state{leader = Leader}), + {internal, [], [], State}; handle_ra_event(_From, {rejected, {not_leader, undefined, _Seq}}, State0) -> % TODO: how should these be handled? re-sent on timer or try random {internal, [], [], State0}; handle_ra_event(_From, {rejected, {not_leader, Leader, Seq}}, State0) -> + % ?INFO("rabbit_fifo_client: rejected ~b not leader ~w leader: ~w~n", + % [Seq, From, Leader]), State1 = State0#state{leader = Leader}, State = resend(Seq, State1), {internal, [], [], State}; @@ -517,7 +554,9 @@ seq_applied({Seq, MaybeAction}, last_applied = Seq}}; error -> % must have already been resent or removed for some other reason - {Corrs, Actions, State} + % still need to update last_applied or we may inadvertently resend + % stuff later + {Corrs, Actions, State#state{last_applied = Seq}} end; seq_applied(_Seq, Acc) -> Acc. @@ -541,7 +580,7 @@ maybe_add_action(Action, Acc, State) -> {[Action | Acc], State}. do_resends(From, To, State) when From =< To -> - ?INFO("doing resends From ~w To ~w~n", [From, To]), + % ?INFO("rabbit_fifo_client: doing resends From ~w To ~w~n", [From, To]), lists:foldl(fun resend/2, State, lists:seq(From, To)); do_resends(_, _, State) -> State. @@ -556,6 +595,10 @@ resend(OldSeq, #state{pending = Pending0, leader = Leader} = State) -> State end. +resend_all_pending(#state{pending = Pend} = State) -> + Seqs = lists:sort(maps:keys(Pend)), + lists:foldl(fun resend/2, State, Seqs). + handle_delivery(Leader, {delivery, Tag, [{FstId, _} | _] = IdMsgs} = Del0, #state{consumer_deliveries = CDels0} = State0) -> {LastId, _} = lists:last(IdMsgs), @@ -610,6 +653,7 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) -> Missing. pick_node(#state{leader = undefined, servers = [N | _]}) -> + %% TODO: pick random rather that first? N; pick_node(#state{leader = Leader}) -> Leader. diff --git a/src/rabbit_fifo_index.erl b/src/rabbit_fifo_index.erl index e1848862fe..345a99a03c 100644 --- a/src/rabbit_fifo_index.erl +++ b/src/rabbit_fifo_index.erl @@ -15,83 +15,83 @@ -include_lib("ra/include/ra.hrl"). -compile({no_auto_import, [size/1]}). --record(state, {data = #{} :: #{integer() => term()}, - smallest :: undefined | non_neg_integer(), - largest :: undefined | non_neg_integer() - }). +-record(?MODULE, {data = #{} :: #{integer() => term()}, + smallest :: undefined | non_neg_integer(), + largest :: undefined | non_neg_integer() + }). --opaque state() :: #state{}. +-opaque state() :: #?MODULE{}. -export_type([state/0]). -spec empty() -> state(). empty() -> - #state{}. + #?MODULE{}. -spec fetch(integer(), state()) -> undefined | term(). -fetch(Key, #state{data = Data}) -> +fetch(Key, #?MODULE{data = Data}) -> maps:get(Key, Data, undefined). % only integer keys are supported -spec append(integer(), term(), state()) -> state(). append(Key, Value, - #state{data = Data, + #?MODULE{data = Data, smallest = Smallest, largest = Largest} = State) when Key > Largest orelse Largest =:= undefined -> - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = ra_lib:default(Smallest, Key), largest = Key}. -spec return(integer(), term(), state()) -> state(). -return(Key, Value, #state{data = Data, smallest = Smallest} = State) +return(Key, Value, #?MODULE{data = Data, smallest = Smallest} = State) when is_integer(Key) andalso Key < Smallest -> % TODO: this could potentially result in very large gaps which would % result in poor performance of smallest/1 % We could try to persist a linked list of "smallests" to make it quicker % to skip from one to the other - needs measurement - State#state{data = maps:put(Key, Value, Data), + State#?MODULE{data = maps:put(Key, Value, Data), smallest = Key}; -return(Key, Value, #state{data = Data} = State) +return(Key, Value, #?MODULE{data = Data} = State) when is_integer(Key) -> - State#state{data = maps:put(Key, Value, Data)}. + State#?MODULE{data = maps:put(Key, Value, Data)}. -spec delete(integer(), state()) -> state(). -delete(Smallest, #state{data = Data0, +delete(Smallest, #?MODULE{data = Data0, largest = Largest, smallest = Smallest} = State) -> Data = maps:remove(Smallest, Data0), case find_next(Smallest + 1, Largest, Data) of undefined -> - State#state{data = Data, + State#?MODULE{data = Data, smallest = undefined, largest = undefined}; Next -> - State#state{data = Data, smallest = Next} + State#?MODULE{data = Data, smallest = Next} end; -delete(Key, #state{data = Data} = State) -> - State#state{data = maps:remove(Key, Data)}. +delete(Key, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:remove(Key, Data)}. -spec size(state()) -> non_neg_integer(). -size(#state{data = Data}) -> +size(#?MODULE{data = Data}) -> maps:size(Data). -spec smallest(state()) -> undefined | {integer(), term()}. -smallest(#state{smallest = undefined}) -> +smallest(#?MODULE{smallest = undefined}) -> undefined; -smallest(#state{smallest = Smallest, data = Data}) -> +smallest(#?MODULE{smallest = Smallest, data = Data}) -> {Smallest, maps:get(Smallest, Data)}. -spec next_key_after(non_neg_integer(), state()) -> undefined | integer(). -next_key_after(_Idx, #state{smallest = undefined}) -> +next_key_after(_Idx, #?MODULE{smallest = undefined}) -> % map must be empty undefined; -next_key_after(Idx, #state{smallest = Smallest, +next_key_after(Idx, #?MODULE{smallest = Smallest, largest = Largest}) when Idx+1 < Smallest orelse Idx+1 > Largest -> undefined; -next_key_after(Idx, #state{data = Data} = State) -> +next_key_after(Idx, #?MODULE{data = Data} = State) -> Next = Idx+1, case maps:is_key(Next, Data) of true -> @@ -101,8 +101,8 @@ next_key_after(Idx, #state{data = Data} = State) -> end. -spec map(fun(), state()) -> state(). -map(F, #state{data = Data} = State) -> - State#state{data = maps:map(F, Data)}. +map(F, #?MODULE{data = Data} = State) -> + State#?MODULE{data = maps:map(F, Data)}. %% internal diff --git a/src/rabbit_lager.erl b/src/rabbit_lager.erl index 21934b5941..da40f207de 100644 --- a/src/rabbit_lager.erl +++ b/src/rabbit_lager.erl @@ -20,7 +20,7 @@ %% API -export([start_logger/0, log_locations/0, fold_sinks/2, - broker_is_started/0]). + broker_is_started/0, set_log_level/1]). %% For test purposes -export([configure_lager/0]). @@ -56,6 +56,34 @@ broker_is_started() -> ok end. +set_log_level(Level) -> + IsValidLevel = lists:member(Level, lager_util:levels()), + set_log_level(IsValidLevel, Level). + +set_log_level(true, Level) -> + SinksAndHandlers = [{Sink, gen_event:which_handlers(Sink)} || + Sink <- lager:list_all_sinks()], + set_sink_log_level(SinksAndHandlers, Level); +set_log_level(_, Level) -> + {error, {invalid_log_level, Level}}. + +set_sink_log_level([], _Level) -> + ok; +set_sink_log_level([{Sink, Handlers}|Rest], Level) -> + set_sink_handler_log_level(Sink, Handlers, Level), + set_sink_log_level(Rest, Level). + +set_sink_handler_log_level(_Sink, [], _Level) -> + ok; +set_sink_handler_log_level(Sink, [Handler|Rest], Level) when is_atom(Handler) -> + ok = lager:set_loglevel(Sink, Handler, undefined, Level), + set_sink_handler_log_level(Sink, Rest, Level); +set_sink_handler_log_level(Sink, [{Handler, Id}|Rest], Level) -> + ok = lager:set_loglevel(Sink, Handler, Id, Level), + set_sink_handler_log_level(Sink, Rest, Level); +set_sink_handler_log_level(Sink, [_|Rest], Level) -> + set_sink_handler_log_level(Sink, Rest, Level). + log_locations() -> ensure_lager_configured(), DefaultHandlers = application:get_env(lager, handlers, []), diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl index a3050c570f..04353423cc 100644 --- a/src/rabbit_mirror_queue_master.erl +++ b/src/rabbit_mirror_queue_master.erl @@ -462,7 +462,7 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% immediately after calling is_duplicate). The msg is %% invalid. We will not see this again, nor will we be %% further involved in confirming this message, so erase. - {true, State #state { seen_status = maps:remove(MsgId, SS) }}; + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS) }}; {ok, Disposition} when Disposition =:= confirmed %% It got published when we were a slave via gm, and @@ -477,8 +477,8 @@ is_duplicate(Message = #basic_message { id = MsgId }, %% Message was discarded while we were a slave. Confirm now. %% As above, amqqueue_process will have the entry for the %% msg_id_to_channel mapping. - {true, State #state { seen_status = maps:remove(MsgId, SS), - confirmed = [MsgId | Confirmed] }} + {{true, drop}, State #state { seen_status = maps:remove(MsgId, SS), + confirmed = [MsgId | Confirmed] }} end. set_queue_mode(Mode, State = #state { gm = GM, diff --git a/src/rabbit_networking.erl b/src/rabbit_networking.erl index 256d424740..cf431ee04f 100644 --- a/src/rabbit_networking.erl +++ b/src/rabbit_networking.erl @@ -36,7 +36,7 @@ connection_info_all/0, connection_info_all/1, emit_connection_info_all/4, emit_connection_info_local/3, close_connection/2, accept_ack/2, - tcp_host/1]). + handshake/2, tcp_host/1]). %% Used by TCP-based transports, e.g. STOMP adapter -export([tcp_listener_addresses/1, tcp_listener_spec/9, @@ -121,7 +121,6 @@ boot() -> %% Failures will throw exceptions _ = boot_listeners(fun boot_tcp/1, application:get_env(rabbit, num_tcp_acceptors, 10), "TCP"), _ = boot_listeners(fun boot_tls/1, application:get_env(rabbit, num_ssl_acceptors, 10), "TLS"), - _ = maybe_start_proxy_protocol(), ok. boot_listeners(Fun, NumAcceptors, Type) -> @@ -190,12 +189,6 @@ log_poodle_fail(Context) -> "'rabbit' section of your configuration file.~n", [rabbit_misc:otp_release(), Context]). -maybe_start_proxy_protocol() -> - case application:get_env(rabbit, proxy_protocol, false) of - false -> ok; - true -> application:start(ranch_proxy_protocol) - end. - fix_ssl_options(Config) -> rabbit_ssl_options:fix(Config). @@ -263,12 +256,9 @@ start_listener0(Address, NumAcceptors, Protocol, Label, Opts) -> end. transport(Protocol) -> - ProxyProtocol = application:get_env(rabbit, proxy_protocol, false), - case {Protocol, ProxyProtocol} of - {amqp, false} -> ranch_tcp; - {amqp, true} -> ranch_proxy; - {'amqp/ssl', false} -> ranch_ssl; - {'amqp/ssl', true} -> ranch_proxy_ssl + case Protocol of + amqp -> ranch_tcp; + 'amqp/ssl' -> ranch_ssl end. @@ -368,16 +358,31 @@ close_connection(Pid, Explanation) -> ok end. +handshake(Ref, ProxyProtocol) -> + case ProxyProtocol of + true -> + {ok, ProxyInfo} = ranch:recv_proxy_header(Ref, 1000), + {ok, Sock} = ranch:handshake(Ref), + tune_buffer_size(Sock), + ok = file_handle_cache:obtain(), + {ok, {rabbit_proxy_socket, Sock, ProxyInfo}}; + false -> + ranch:handshake(Ref) + end. + accept_ack(Ref, Sock) -> ok = ranch:accept_ack(Ref), - case tune_buffer_size(Sock) of + tune_buffer_size(Sock), + ok = file_handle_cache:obtain(). + +tune_buffer_size(Sock) -> + case tune_buffer_size1(Sock) of ok -> ok; {error, _} -> rabbit_net:fast_close(Sock), exit(normal) - end, - ok = file_handle_cache:obtain(). + end. -tune_buffer_size(Sock) -> +tune_buffer_size1(Sock) -> case rabbit_net:getopts(Sock, [sndbuf, recbuf, buffer]) of {ok, BufSizes} -> BufSz = lists:max([Sz || {_Opt, Sz} <- BufSizes]), rabbit_net:setopts(Sock, [{buffer, BufSz}]); diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f32d261d20..bbbeb3ce44 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -26,6 +26,8 @@ %%---------------------------------------------------------------------------- +-define(QUEUE, lqueue). + -define(UNSENT_MESSAGE_LIMIT, 200). %% Utilisation average calculations are all in μs. @@ -128,7 +130,7 @@ consumers(Consumers, Acc) -> count() -> lists:sum([Count || #cr{consumer_count = Count} <- all_ch_record()]). unacknowledged_message_count() -> - lists:sum([queue:len(C#cr.acktags) || C <- all_ch_record()]). + lists:sum([?QUEUE:len(C#cr.acktags) || C <- all_ch_record()]). add(ChPid, CTag, NoAck, LimiterPid, LimiterActive, Prefetch, Args, IsEmpty, Username, State = #state{consumers = Consumers, @@ -185,7 +187,7 @@ erase_ch(ChPid, State = #state{consumers = Consumers}) -> All = priority_queue:join(Consumers, BlockedQ), ok = erase_ch_record(C), Filtered = priority_queue:filter(chan_pred(ChPid, true), All), - {[AckTag || {AckTag, _CTag} <- queue:to_list(ChAckTags)], + {[AckTag || {AckTag, _CTag} <- ?QUEUE:to_list(ChAckTags)], tags(priority_queue:to_list(Filtered)), State#state{consumers = remove_consumers(ChPid, Consumers)}} end. @@ -267,7 +269,7 @@ deliver_to_consumer(FetchFun, rabbit_channel:deliver(ChPid, CTag, AckRequired, {QName, self(), AckTag, IsDelivered, Message}), ChAckTags1 = case AckRequired of - true -> queue:in({AckTag, CTag}, ChAckTags); + true -> ?QUEUE:in({AckTag, CTag}, ChAckTags); false -> ChAckTags end, update_ch_record(C#cr{acktags = ChAckTags1, @@ -280,7 +282,7 @@ is_blocked(Consumer = {ChPid, _C}) -> record_ack(ChPid, LimiterPid, AckTag) -> C = #cr{acktags = ChAckTags} = ch_record(ChPid, LimiterPid), - update_ch_record(C#cr{acktags = queue:in({AckTag, none}, ChAckTags)}), + update_ch_record(C#cr{acktags = ?QUEUE:in({AckTag, none}, ChAckTags)}), ok. subtract_acks(ChPid, AckTags, State) -> @@ -308,9 +310,9 @@ subtract_acks(ChPid, AckTags, State) -> subtract_acks([], [], CTagCounts, AckQ) -> {CTagCounts, AckQ}; subtract_acks([], Prefix, CTagCounts, AckQ) -> - {CTagCounts, queue:join(queue:from_list(lists:reverse(Prefix)), AckQ)}; + {CTagCounts, ?QUEUE:join(?QUEUE:from_list(lists:reverse(Prefix)), AckQ)}; subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> - case queue:out(AckQ) of + case ?QUEUE:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); @@ -437,7 +439,7 @@ ch_record(ChPid, LimiterPid) -> Limiter = rabbit_limiter:client(LimiterPid), C = #cr{ch_pid = ChPid, monitor_ref = MonitorRef, - acktags = queue:new(), + acktags = ?QUEUE:new(), consumer_count = 0, blocked_consumers = priority_queue:new(), limiter = Limiter, @@ -450,7 +452,7 @@ ch_record(ChPid, LimiterPid) -> update_ch_record(C = #cr{consumer_count = ConsumerCount, acktags = ChAckTags, unsent_message_count = UnsentMessageCount}) -> - case {queue:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of + case {?QUEUE:is_empty(ChAckTags), ConsumerCount, UnsentMessageCount} of {true, 0, 0} -> ok = erase_ch_record(C); _ -> ok = store_ch_record(C) end, diff --git a/src/rabbit_quorum_memory_manager.erl b/src/rabbit_quorum_memory_manager.erl new file mode 100644 index 0000000000..347f7f205e --- /dev/null +++ b/src/rabbit_quorum_memory_manager.erl @@ -0,0 +1,76 @@ +%% The contents of this file are subject to the Mozilla Public License +%% Version 1.1 (the "License"); you may not use this file except in +%% compliance with the License. You may obtain a copy of the License +%% at http://www.mozilla.org/MPL/ +%% +%% Software distributed under the License is distributed on an "AS IS" +%% basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See +%% the License for the specific language governing rights and +%% limitations under the License. +%% +%% The Original Code is RabbitMQ. +%% +%% The Initial Developer of the Original Code is GoPivotal, Inc. +%% Copyright (c) 2018 Pivotal Software, Inc. All rights reserved. +%% +-module(rabbit_quorum_memory_manager). + +-include("rabbit.hrl"). + +-export([init/1, handle_call/2, handle_event/2, handle_info/2, + terminate/2, code_change/3]). +-export([register/0, unregister/0]). + +-record(state, {last_roll_over, + interval}). + +-rabbit_boot_step({rabbit_quorum_memory_manager, + [{description, "quorum memory manager"}, + {mfa, {?MODULE, register, []}}, + {cleanup, {?MODULE, unregister, []}}, + {requires, rabbit_event}, + {enables, recovery}]}). + +register() -> + gen_event:add_handler(rabbit_alarm, ?MODULE, []). + +unregister() -> + gen_event:delete_handler(rabbit_alarm, ?MODULE, []). + +init([]) -> + {ok, #state{interval = interval()}}. + +handle_call( _, State) -> + {ok, ok, State}. + +handle_event({set_alarm, {{resource_limit, memory, Node}, []}}, + #state{last_roll_over = undefined} = State) when Node == node() -> + {ok, force_roll_over(State)}; +handle_event({set_alarm, {{resource_limit, memory, Node}, []}}, + #state{last_roll_over = Last, interval = Interval } = State) + when Node == node() -> + Now = erlang:system_time(millisecond), + case Now > (Last + Interval) of + true -> + {ok, force_roll_over(State)}; + false -> + {ok, State} + end; +handle_event(_, State) -> + {ok, State}. + +handle_info(_, State) -> + {ok, State}. + +terminate(_, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +force_roll_over(State) -> + ra_log_wal:force_roll_over(ra_log_wal), + State#state{last_roll_over = erlang:system_time(millisecond)}. + +interval() -> + application:get_env(rabbit, min_wal_roll_over_interval, 20000). diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index e24b907600..5e854a4657 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -17,7 +17,7 @@ -module(rabbit_quorum_queue). -export([init_state/2, handle_event/2]). --export([declare/1, recover/1, stop/1, delete/4, delete_immediately/1]). +-export([declare/1, recover/1, stop/1, delete/4, delete_immediately/2]). -export([info/1, info/2, stat/1, infos/1]). -export([ack/3, reject/4, basic_get/4, basic_consume/9, basic_cancel/4]). -export([credit/4]). @@ -34,6 +34,7 @@ -export([add_member/3]). -export([delete_member/3]). -export([requeue/3]). +-export([policy_changed/2]). -export([cleanup_data_dir/0]). -include_lib("rabbit_common/include/rabbit.hrl"). @@ -74,7 +75,7 @@ -spec infos(rabbit_types:r('queue')) -> rabbit_types:infos(). -spec stat(rabbit_types:amqqueue()) -> {'ok', non_neg_integer(), non_neg_integer()}. -spec cluster_state(Name :: atom()) -> 'down' | 'recovering' | 'running'. --spec status(rabbit_types:vhost(), Name :: atom()) -> rabbit_types:infos() | {error, term()}. +-spec status(rabbit_types:vhost(), Name :: rabbit_misc:resource_name()) -> rabbit_types:infos() | {error, term()}. -define(STATISTICS_KEYS, [policy, @@ -93,14 +94,17 @@ %%---------------------------------------------------------------------------- -spec init_state(ra_server_id(), rabbit_types:r('queue')) -> - rabbit_fifo_client:state(). + rabbit_fifo_client:state(). init_state({Name, _}, QName) -> {ok, SoftLimit} = application:get_env(rabbit, quorum_commands_soft_limit), - {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes0}} = + %% This lookup could potentially return an {error, not_found}, but we do not + %% know what to do if the queue has `disappeared`. Let it crash. + {ok, #amqqueue{pid = Leader, quorum_nodes = Nodes}} = rabbit_amqqueue:lookup(QName), %% Ensure the leader is listed first - Nodes = [Leader | lists:delete(Leader, Nodes0)], - rabbit_fifo_client:init(QName, Nodes, SoftLimit, + Servers0 = [{Name, N} || N <- Nodes], + Servers = [Leader | lists:delete(Leader, Servers0)], + rabbit_fifo_client:init(qname_to_rname(QName), Servers, SoftLimit, fun() -> credit_flow:block(Name), ok end, fun() -> credit_flow:unblock(Name), ok end). @@ -148,12 +152,14 @@ declare(#amqqueue{name = QName, -ra_machine(Q = #amqqueue{name = QName}) -> - {module, rabbit_fifo, - #{dead_letter_handler => dlx_mfa(Q), - cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, - become_leader_handler => {?MODULE, become_leader, [QName]}, - metrics_handler => {?MODULE, update_metrics, [QName]}}}. +ra_machine(Q) -> + {module, rabbit_fifo, ra_machine_config(Q)}. + +ra_machine_config(Q = #amqqueue{name = QName}) -> + #{dead_letter_handler => dlx_mfa(Q), + cancel_consumer_handler => {?MODULE, cancel_consumer, [QName]}, + become_leader_handler => {?MODULE, become_leader, [QName]}, + metrics_handler => {?MODULE, update_metrics, [QName]}}. cancel_consumer_handler(QName, {ConsumerTag, ChPid}, _Name) -> Node = node(ChPid), @@ -272,9 +278,10 @@ stop(VHost) -> delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = QNodes}, _IfUnused, _IfEmpty, ActingUser) -> %% TODO Quorum queue needs to support consumer tracking for IfUnused + Timeout = application:get_env(kernel, net_ticktime, 60000) + 5000, Msgs = quorum_messages(Name), _ = rabbit_amqqueue:internal_delete(QName, ActingUser), - case ra:delete_cluster([{Name, Node} || Node <- QNodes], 120000) of + case ra:delete_cluster([{Name, Node} || Node <- QNodes], Timeout) of {ok, {_, LeaderNode} = Leader} -> MRef = erlang:monitor(process, Leader), receive @@ -300,11 +307,10 @@ delete(#amqqueue{ type = quorum, pid = {Name, _}, name = QName, quorum_nodes = Q end end. -delete_immediately({Name, _} = QPid) -> - QName = queue_name(Name), - _ = rabbit_amqqueue:internal_delete(QName, ?INTERNAL_USER), - ok = ra:delete_cluster([QPid]), - rabbit_core_metrics:queue_deleted(QName), +delete_immediately(Resource, {_Name, _} = QPid) -> + _ = rabbit_amqqueue:internal_delete(Resource, ?INTERNAL_USER), + {ok, _} = ra:delete_cluster([QPid]), + rabbit_core_metrics:queue_deleted(Resource), ok. ack(CTag, MsgIds, QState) -> @@ -330,8 +336,10 @@ basic_get(#amqqueue{name = QName, pid = {Name, _} = Id, type = quorum}, NoAck, case rabbit_fifo_client:dequeue(CTag, Settlement, QState0) of {ok, empty, QState} -> {ok, empty, QState}; - {ok, {MsgId, {MsgHeader, Msg}}, QState} -> - IsDelivered = maps:is_key(delivery_count, MsgHeader), + {ok, {MsgId, {MsgHeader, Msg0}}, QState} -> + Count = maps:get(delivery_count, MsgHeader, 0), + IsDelivered = Count > 0, + Msg = rabbit_basic:add_header(<<"x-delivery-count">>, long, Count, Msg0), {ok, quorum_messages(Name), {QName, Id, MsgId, IsDelivered, Msg}, QState}; {timeout, _} -> {error, timeout} @@ -411,6 +419,10 @@ maybe_delete_data_dir(UId) -> ok end. +policy_changed(QName, Node) -> + {ok, Q} = rabbit_amqqueue:lookup(QName), + rabbit_fifo_client:update_machine_state(Node, ra_machine_config(Q)). + cluster_state(Name) -> case whereis(Name) of undefined -> down; @@ -547,9 +559,13 @@ args_policy_lookup(Name, Resolve, Q = #amqqueue{arguments = Args}) -> dead_letter_publish(undefined, _, _, _) -> ok; dead_letter_publish(X, RK, QName, ReasonMsgs) -> - {ok, Exchange} = rabbit_exchange:lookup(X), - [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName) - || {Reason, Msg} <- ReasonMsgs]. + case rabbit_exchange:lookup(X) of + {ok, Exchange} -> + [rabbit_dead_letter:publish(Msg, Reason, Exchange, RK, QName) + || {Reason, Msg} <- ReasonMsgs]; + {error, not_found} -> + ok + end. %% TODO escape hack qname_to_rname(#resource{virtual_host = <<"/">>, name = Name}) -> diff --git a/src/rabbit_reader.erl b/src/rabbit_reader.erl index 91002d0b94..8370d08ed9 100644 --- a/src/rabbit_reader.erl +++ b/src/rabbit_reader.erl @@ -57,12 +57,12 @@ -include("rabbit_framing.hrl"). -include("rabbit.hrl"). --export([start_link/3, info_keys/0, info/1, info/2, +-export([start_link/2, info_keys/0, info/1, info/2, shutdown/2]). -export([system_continue/3, system_terminate/4, system_code_change/4]). --export([init/4, mainloop/4, recvloop/4]). +-export([init/3, mainloop/4, recvloop/4]). -export([conserve_resources/3, server_properties/1]). @@ -157,7 +157,7 @@ %%-------------------------------------------------------------------------- --spec start_link(pid(), any(), rabbit_net:socket()) -> rabbit_types:ok(pid()). +-spec start_link(pid(), any()) -> rabbit_types:ok(pid()). -spec info_keys() -> rabbit_types:info_keys(). -spec info(pid()) -> rabbit_types:infos(). -spec info(pid(), rabbit_types:info_keys()) -> rabbit_types:infos(). @@ -170,7 +170,7 @@ rabbit_framing:amqp_table(). %% These specs only exists to add no_return() to keep dialyzer happy --spec init(pid(), pid(), any(), rabbit_net:socket()) -> no_return(). +-spec init(pid(), pid(), any()) -> no_return(). -spec start_connection(pid(), pid(), any(), rabbit_net:socket()) -> no_return(). @@ -181,18 +181,18 @@ %%-------------------------------------------------------------------------- -start_link(HelperSup, Ref, Sock) -> - Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref, Sock]), +start_link(HelperSup, Ref) -> + Pid = proc_lib:spawn_link(?MODULE, init, [self(), HelperSup, Ref]), {ok, Pid}. shutdown(Pid, Explanation) -> gen_server:call(Pid, {shutdown, Explanation}, infinity). -init(Parent, HelperSup, Ref, Sock) -> +init(Parent, HelperSup, Ref) -> ?LG_PROCESS_TYPE(reader), - RealSocket = rabbit_net:unwrap_socket(Sock), - rabbit_networking:accept_ack(Ref, RealSocket), + {ok, Sock} = rabbit_networking:handshake(Ref, + application:get_env(rabbit, proxy_protocol, false)), Deb = sys:debug_options([]), start_connection(Parent, HelperSup, Deb, Sock). diff --git a/src/rabbit_vhost.erl b/src/rabbit_vhost.erl index c460b02e5b..e462fc6bc0 100644 --- a/src/rabbit_vhost.erl +++ b/src/rabbit_vhost.erl @@ -208,7 +208,16 @@ delete_storage(VHost) -> VhostDir = msg_store_dir_path(VHost), rabbit_log:info("Deleting message store directory for vhost '~s' at '~s'~n", [VHost, VhostDir]), %% Message store should be closed when vhost supervisor is closed. - ok = rabbit_file:recursive_delete([VhostDir]). + case rabbit_file:recursive_delete([VhostDir]) of + ok -> ok; + {error, {_, enoent}} -> + %% a concurrent delete did the job for us + rabbit_log:warning("Tried to delete storage directories for vhost '~s', it failed with an ENOENT", [VHost]), + ok; + Other -> + rabbit_log:warning("Tried to delete storage directories for vhost '~s': ~p", [VHost, Other]), + Other + end. assert_benign(ok, _) -> ok; assert_benign({ok, _}, _) -> ok; diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 265bcb45e3..e495ab8677 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -35,7 +35,7 @@ memory() -> {Sums, _Other} = sum_processes( lists:append(All), distinguishers(), [memory]), - [Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, Sums, memory, fun (X) -> X end) || Names <- distinguished_interesting_sups()], @@ -69,7 +69,7 @@ memory() -> OtherProc = Processes - ConnsReader - ConnsWriter - ConnsChannel - ConnsOther - - Qs - QsSlave - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, + - Qs - QsSlave - Qqs - MsgIndexProc - Plugins - MgmtDbProc - MetricsProc, [ %% Connections @@ -81,6 +81,7 @@ memory() -> %% Queues {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, %% Processes {plugins, Plugins}, @@ -124,7 +125,7 @@ binary() -> sets:add_element({Ptr, Sz}, Acc0) end, Acc, Info) end, distinguishers(), [{binary, sets:new()}]), - [Other, Qs, QsSlave, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, + [Other, Qs, QsSlave, Qqs, ConnsReader, ConnsWriter, ConnsChannel, ConnsOther, MsgIndexProc, MgmtDbProc, Plugins] = [aggregate(Names, [{other, Rest} | Sums], binary, fun sum_binary/1) || Names <- [[other] | distinguished_interesting_sups()]], @@ -134,6 +135,7 @@ binary() -> {connection_other, ConnsOther}, {queue_procs, Qs}, {queue_slave_procs, QsSlave}, + {quorum_queue_procs, Qqs}, {plugins, Plugins}, {mgmt_db, MgmtDbProc}, {msg_index, MsgIndexProc}, @@ -173,11 +175,16 @@ bytes(Words) -> try end. interesting_sups() -> - [queue_sups(), conn_sups() | interesting_sups0()]. + [queue_sups(), quorum_sups(), conn_sups() | interesting_sups0()]. queue_sups() -> all_vhosts_children(rabbit_amqqueue_sup_sup). +quorum_sups() -> + %% TODO: in the future not all ra servers may be queues and we needs + %% some way to filter this + [ra_server_sup]. + msg_stores() -> all_vhosts_children(msg_store_transient) ++ @@ -229,6 +236,7 @@ distinguished_interesting_sups() -> [ with(queue_sups(), master), with(queue_sups(), slave), + quorum_sups(), with(conn_sups(), reader), with(conn_sups(), writer), with(conn_sups(), channel), diff --git a/test/config_schema_SUITE_data/rabbit.snippets b/test/config_schema_SUITE_data/rabbit.snippets index 625fcd93a9..b318adaa12 100644 --- a/test/config_schema_SUITE_data/rabbit.snippets +++ b/test/config_schema_SUITE_data/rabbit.snippets @@ -568,12 +568,27 @@ credential_validator.regexp = ^abc\\d+", {delegate_count, 64} ]}], []}, + {kernel_net_ticktime, "net_ticktime = 20", [{kernel, [ {net_ticktime, 20} ]}], []}, + + {kernel_inet_dist_listen_min, + "inet_dist_listen_min = 16000", + [{kernel, [ + {inet_dist_listen_min, 16000} + ]}], + []}, + {kernel_inet_dist_listen_max, + "inet_dist_listen_max = 16100", + [{kernel, [ + {inet_dist_listen_max, 16100} + ]}], + []}, + {log_syslog_settings, "log.syslog = true log.syslog.identity = rabbitmq diff --git a/test/quorum_queue_SUITE.erl b/test/quorum_queue_SUITE.erl index 19f3a66a1d..5b87c5be20 100644 --- a/test/quorum_queue_SUITE.erl +++ b/test/quorum_queue_SUITE.erl @@ -48,18 +48,24 @@ groups() -> ++ all_tests()}, {cluster_size_3, [], [ declare_during_node_down, + simple_confirm_availability_on_leader_change, + confirm_availability_on_leader_change, recover_from_single_failure, recover_from_multiple_failures, leadership_takeover, delete_declare, metrics_cleanup_on_leadership_takeover, metrics_cleanup_on_leader_crash, - consume_in_minority - ]}, + consume_in_minority]}, {cluster_size_5, [], [start_queue, start_queue_concurrent, quorum_cluster_size_3, quorum_cluster_size_7 + ]}, + {clustered_with_partitions, [], [ + reconnect_consumer_and_publish, + reconnect_consumer_and_wait, + reconnect_consumer_and_wait_channel_down ]} ]} ]. @@ -100,6 +106,7 @@ all_tests() -> dead_letter_to_classic_queue, dead_letter_to_quorum_queue, dead_letter_from_classic_to_quorum_queue, + dead_letter_policy, cleanup_queue_state_on_channel_after_publish, cleanup_queue_state_on_channel_after_subscribe, basic_cancel, @@ -108,7 +115,12 @@ all_tests() -> cancel_sync_queue, basic_recover, idempotent_recover, - vhost_with_quorum_queue_is_deleted + vhost_with_quorum_queue_is_deleted, + delete_immediately, + delete_immediately_by_resource, + consume_redelivery_count, + subscribe_redelivery_count, + memory_alarm_rolls_wal ]. %% ------------------------------------------------------------------- @@ -117,7 +129,9 @@ all_tests() -> init_per_suite(Config) -> rabbit_ct_helpers:log_environment(), - rabbit_ct_helpers:run_setup_steps(Config). + rabbit_ct_helpers:run_setup_steps( + Config, + [fun rabbit_ct_broker_helpers:enable_dist_proxy_manager/1]). end_per_suite(Config) -> rabbit_ct_helpers:run_teardown_steps(Config). @@ -126,6 +140,8 @@ init_per_group(clustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, true}]); init_per_group(unclustered, Config) -> rabbit_ct_helpers:set_config(Config, [{rmq_nodes_clustered, false}]); +init_per_group(clustered_with_partitions, Config) -> + rabbit_ct_helpers:set_config(Config, [{net_ticktime, 10}]); init_per_group(Group, Config) -> ClusterSize = case Group of single_node -> 1; @@ -137,7 +153,8 @@ init_per_group(Group, Config) -> [{rmq_nodes_count, ClusterSize}, {rmq_nodename_suffix, Group}, {tcp_ports_base}]), - Config2 = rabbit_ct_helpers:run_steps(Config1, + Config1b = rabbit_ct_helpers:set_config(Config1, [{net_ticktime, 10}]), + Config2 = rabbit_ct_helpers:run_steps(Config1b, [fun merge_app_env/1 ] ++ rabbit_ct_broker_helpers:setup_steps()), ok = rabbit_ct_broker_helpers:rpc( @@ -157,10 +174,28 @@ end_per_group(clustered, Config) -> Config; end_per_group(unclustered, Config) -> Config; +end_per_group(clustered_with_partitions, Config) -> + Config; end_per_group(_, Config) -> rabbit_ct_helpers:run_steps(Config, rabbit_ct_broker_helpers:teardown_steps()). +init_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), + Q = rabbit_data_coercion:to_binary(Testcase), + Config2 = rabbit_ct_helpers:set_config(Config1, + [{rmq_nodes_count, 3}, + {rmq_nodename_suffix, Testcase}, + {tcp_ports_base}, + {queue_name, Q} + ]), + rabbit_ct_helpers:run_steps(Config2, + rabbit_ct_broker_helpers:setup_steps() ++ + rabbit_ct_client_helpers:setup_steps() ++ + [fun rabbit_ct_broker_helpers:enable_dist_proxy/1, + fun rabbit_ct_broker_helpers:cluster_nodes/1]); init_per_testcase(Testcase, Config) -> Config1 = rabbit_ct_helpers:testcase_started(Config, Testcase), rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, delete_queues, []), @@ -168,12 +203,21 @@ init_per_testcase(Testcase, Config) -> Config2 = rabbit_ct_helpers:set_config(Config1, [{queue_name, Q} ]), - rabbit_ct_helpers:run_steps(Config2, - rabbit_ct_client_helpers:setup_steps()). + rabbit_ct_helpers:run_steps(Config2, rabbit_ct_client_helpers:setup_steps()). merge_app_env(Config) -> - rabbit_ct_helpers:merge_app_env(Config, {rabbit, [{core_metrics_gc_interval, 100}]}). - + rabbit_ct_helpers:merge_app_env( + rabbit_ct_helpers:merge_app_env(Config, + {rabbit, [{core_metrics_gc_interval, 100}]}), + {ra, [{min_wal_roll_over_interval, 30000}]}). + +end_per_testcase(Testcase, Config) when Testcase == reconnect_consumer_and_publish; + Testcase == reconnect_consumer_and_wait; + Testcase == reconnect_consumer_and_wait_channel_down -> + Config1 = rabbit_ct_helpers:run_steps(Config, + rabbit_ct_client_helpers:teardown_steps() ++ + rabbit_ct_broker_helpers:teardown_steps()), + rabbit_ct_helpers:testcase_finished(Config1, Testcase); end_per_testcase(Testcase, Config) -> catch delete_queues(), Config1 = rabbit_ct_helpers:run_steps( @@ -539,6 +583,19 @@ publish(Config) -> wait_for_messages_ready(Servers, Name, 1), wait_for_messages_pending_ack(Servers, Name, 0). +publish_confirm(Ch, QName) -> + publish(Ch, QName), + amqp_channel:register_confirm_handler(Ch, self()), + ct:pal("waiting for confirms from ~s", [QName]), + ok = receive + #'basic.ack'{} -> ok; + #'basic.nack'{} -> fail + after 2500 -> + exit(confirm_timeout) + end, + ct:pal("CONFIRMED! ~s", [QName]), + ok. + ra_name(Q) -> binary_to_atom(<<"%2F_", Q/binary>>, utf8). @@ -1064,22 +1121,47 @@ dead_letter_to_classic_queue(Config) -> {<<"x-dead-letter-routing-key">>, longstr, CQ} ])), ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), - RaName = ra_name(QQ), - publish(Ch, QQ), + test_dead_lettering(true, Config, Ch, Servers, ra_name(QQ), QQ, CQ). + +test_dead_lettering(PolicySet, Config, Ch, Servers, RaName, Source, Destination) -> + publish(Ch, Source), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), - DeliveryTag = consume(Ch, QQ, false), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), + DeliveryTag = consume(Ch, Source, false), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 1), - wait_for_messages(Config, [[CQ, <<"0">>, <<"0">>, <<"0">>]]), + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]), amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, multiple = false, requeue = false}), wait_for_messages_ready(Servers, RaName, 0), wait_for_messages_pending_ack(Servers, RaName, 0), - wait_for_messages(Config, [[CQ, <<"1">>, <<"1">>, <<"0">>]]), - _ = consume(Ch, CQ, false). + case PolicySet of + true -> + wait_for_messages(Config, [[Destination, <<"1">>, <<"1">>, <<"0">>]]), + _ = consume(Ch, Destination, true); + false -> + wait_for_messages(Config, [[Destination, <<"0">>, <<"0">>, <<"0">>]]) + end. + +dead_letter_policy(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + CQ = <<"classic-dead_letter_policy">>, + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + ?assertEqual({'queue.declare_ok', CQ, 0, 0}, declare(Ch, CQ, [])), + ok = rabbit_ct_broker_helpers:set_policy( + Config, 0, <<"dlx">>, <<"dead_letter.*">>, <<"queues">>, + [{<<"dead-letter-exchange">>, <<"">>}, + {<<"dead-letter-routing-key">>, CQ}]), + RaName = ra_name(QQ), + test_dead_lettering(true, Config, Ch, Servers, RaName, QQ, CQ), + ok = rabbit_ct_broker_helpers:clear_policy(Config, 0, <<"dlx">>), + test_dead_lettering(false, Config, Ch, Servers, RaName, QQ, CQ). dead_letter_to_quorum_queue(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1488,6 +1570,82 @@ declare_during_node_down(Config) -> wait_for_messages_ready(Servers, RaName, 1), ok. +simple_confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + publish_confirm(Ch, QQ), + + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + publish_confirm(Ch, QQ), + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +confirm_availability_on_leader_change(Config) -> + [Node1, Node2, _Node3] = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + %% declare a queue on node2 - this _should_ host the leader on node 2 + DCh = rabbit_ct_client_helpers:open_channel(Config, Node2), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(DCh, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + erlang:process_flag(trap_exit, true), + Pid = spawn_link(fun () -> + %% open a channel to another node + Ch = rabbit_ct_client_helpers:open_channel(Config, Node1), + #'confirm.select_ok'{} = amqp_channel:call(Ch, #'confirm.select'{}), + ConfirmLoop = fun Loop() -> + publish_confirm(Ch, QQ), + receive {done, P} -> + P ! done, + ok + after 0 -> Loop() end + end, + ConfirmLoop() + end), + + timer:sleep(500), + %% stop the node hosting the leader + stop_node(Config, Node2), + %% this should not fail as the channel should detect the new leader and + %% resend to that + timer:sleep(500), + Pid ! {done, self()}, + receive + done -> ok; + {'EXIT', Pid, Err} -> + exit(Err) + after 5500 -> + flush(100), + exit(bah) + end, + ok = rabbit_ct_broker_helpers:start_node(Config, Node2), + ok. + +flush(T) -> + receive X -> + ct:pal("flushed ~w", [X]), + flush(T) + after T -> + ok + end. + + add_member_not_running(Config) -> [Server | _] = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1587,6 +1745,7 @@ delete_member(Config) -> rpc:call(Server, rabbit_quorum_queue, delete_member, [<<"/">>, QQ, Server])). + cleanup_data_dir(Config) -> %% This test is slow, but also checks that we handle properly errors when %% trying to delete a queue in minority. A case clause there had gone @@ -1608,6 +1767,7 @@ cleanup_data_dir(Config) -> ?assertExit({{shutdown, {connection_closing, {server_initiated_close, 541, _}}}, _}, amqp_channel:call(Ch, #'queue.delete'{queue = QQ})), + catch amqp_channel:call(Ch, #'queue.delete'{queue = QQ}), ?assert(filelib:is_dir(DataDir)), ?assertEqual(ok, @@ -1615,6 +1775,132 @@ cleanup_data_dir(Config) -> [])), ?assert(not filelib:is_dir(DataDir)). +reconnect_consumer_and_publish(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 2), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1), + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = true}, _} -> + amqp_channel:cast(ChF, #'basic.ack'{delivery_tag = DeliveryTag, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +reconnect_consumer_and_wait_channel_down(Config) -> + [Server | _] = Servers = + rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + {ok, _, {_, Leader}} = ra:members({RaName, Server}), + [F1, F2] = lists:delete(Leader, Servers), + ChF = rabbit_ct_client_helpers:open_channel(Config, F1), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(ChF, QQ, false), + receive + {#'basic.deliver'{redelivered = false}, _} -> + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 1) + end, + Up = [Leader, F2], + rabbit_ct_broker_helpers:block_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:block_traffic_between(F1, F2), + wait_for_messages_ready(Up, RaName, 1), + wait_for_messages_pending_ack(Up, RaName, 0), + wait_for_messages_ready([F1], RaName, 0), + wait_for_messages_pending_ack([F1], RaName, 1), + rabbit_ct_client_helpers:close_channel(ChF), + rabbit_ct_broker_helpers:allow_traffic_between(F1, Leader), + rabbit_ct_broker_helpers:allow_traffic_between(F1, F2), + %% Let's give it a few seconds to ensure it doesn't attempt to + %% deliver to the down channel - it shouldn't be monitored + %% at this time! + timer:sleep(5000), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0). + basic_recover(Config) -> [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), @@ -1633,6 +1919,156 @@ basic_recover(Config) -> amqp_channel:cast(Ch, #'basic.recover'{requeue = true}), wait_for_messages_ready(Servers, RaName, 1), wait_for_messages_pending_ack(Servers, RaName, 0). + +delete_immediately(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + Args = [{<<"x-queue-type">>, longstr, <<"quorum">>}], + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, Args)), + + Cmd = ["eval", "{ok, Q} = rabbit_amqqueue:lookup(rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)), Pid = rabbit_amqqueue:pid_of(Q), rabbit_amqqueue:delete_immediately([Pid])."], + {ok, Msg} = rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd), + ?assertEqual(match, re:run(Msg, ".*error.*", [{capture, none}])), + + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + amqp_channel:call(Ch, #'queue.declare'{queue = QQ, + durable = true, + passive = true, + auto_delete = false, + arguments = Args})). + +delete_immediately_by_resource(Config) -> + Server = rabbit_ct_broker_helpers:get_node_config(Config, 0, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + Cmd2 = ["eval", "rabbit_amqqueue:delete_immediately_by_resource([rabbit_misc:r(<<\"/\">>, queue, <<\"" ++ binary_to_list(QQ) ++ "\">>)])."], + ?assertEqual({ok, "ok\n"}, rabbit_ct_broker_helpers:rabbitmqctl(Config, 0, Cmd2)), + + %% Check that the application and process are down + wait_until(fun() -> + [] == rpc:call(Server, supervisor, which_children, [ra_server_sup]) + end), + ?assertMatch({ra, _, _}, lists:keyfind(ra, 1, + rpc:call(Server, application, which_applications, []))). + +subscribe_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + subscribe(Ch, QQ, false), + + DTag = <<"x-delivery-count">>, + receive + {#'basic.deliver'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} -> + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} -> + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}) + end, + + receive + {#'basic.deliver'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} -> + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.ack'{delivery_tag = DeliveryTag2, + multiple = false}), + wait_for_messages_ready(Servers, RaName, 0), + wait_for_messages_pending_ack(Servers, RaName, 0) + end. + +consume_redelivery_count(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + RaName = ra_name(QQ), + publish(Ch, QQ), + wait_for_messages_ready(Servers, RaName, 1), + wait_for_messages_pending_ack(Servers, RaName, 0), + + DTag = <<"x-delivery-count">>, + + {#'basic.get_ok'{delivery_tag = DeliveryTag, + redelivered = false}, + #amqp_msg{props = #'P_basic'{headers = H0}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 0}, rabbit_basic:header(DTag, H0)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag, + multiple = false, + requeue = true}), + %% wait for requeueing + timer:sleep(500), + + {#'basic.get_ok'{delivery_tag = DeliveryTag1, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H1}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 1}, rabbit_basic:header(DTag, H1)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag1, + multiple = false, + requeue = true}), + + {#'basic.get_ok'{delivery_tag = DeliveryTag2, + redelivered = true}, + #amqp_msg{props = #'P_basic'{headers = H2}}} = + amqp_channel:call(Ch, #'basic.get'{queue = QQ, + no_ack = false}), + ?assertMatch({DTag, _, 2}, rabbit_basic:header(DTag, H2)), + amqp_channel:cast(Ch, #'basic.nack'{delivery_tag = DeliveryTag2, + multiple = false, + requeue = true}). + +memory_alarm_rolls_wal(Config) -> + [Server | _] = Servers = rabbit_ct_broker_helpers:get_node_configs(Config, nodename), + Ch = rabbit_ct_client_helpers:open_channel(Config, Server), + QQ = ?config(queue_name, Config), + ?assertEqual({'queue.declare_ok', QQ, 0, 0}, + declare(Ch, QQ, [{<<"x-queue-type">>, longstr, <<"quorum">>}])), + WalDataDir = rpc:call(Server, ra_env, wal_data_dir, []), + [Wal0] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ok = rpc:call(Server, rabbit_alarm, set_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + [Wal1] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ?assert(Wal0 =/= Wal1), + %% roll over shouldn't happen if we trigger a new alarm in less than + %% min_wal_roll_over_interval + ok = rpc:call(Server, rabbit_alarm, set_alarm, + [{{resource_limit, memory, Server}, []}]), + timer:sleep(1000), + [Wal2] = filelib:wildcard(WalDataDir ++ "/*.wal"), + ?assert(Wal1 == Wal2). + %%---------------------------------------------------------------------------- declare(Ch, Q) -> @@ -1684,7 +2120,7 @@ filter_queues(Expected, Got) -> end, Got). publish(Ch, Queue) -> - ok = amqp_channel:call(Ch, + ok = amqp_channel:cast(Ch, #'basic.publish'{routing_key = Queue}, #amqp_msg{props = #'P_basic'{delivery_mode = 2}, payload = <<"msg">>}). diff --git a/test/rabbit_fifo_SUITE.erl b/test/rabbit_fifo_SUITE.erl index a2e22afc2e..56608e9af3 100644 --- a/test/rabbit_fifo_SUITE.erl +++ b/test/rabbit_fifo_SUITE.erl @@ -99,8 +99,13 @@ basics(Config) -> _ = ra:stop_server(ServerId), _ = ra:restart_server(ServerId), - % give time to become leader - timer:sleep(500), + %% wait for leader change to notice server is up again + receive + {ra_event, _, {machine, leader_change}} -> ok + after 5000 -> + exit(leader_change_timeout) + end, + {ok, FState6} = rabbit_fifo_client:enqueue(two, FState5b), % process applied event FState6b = process_ra_event(FState6, 250), @@ -523,8 +528,9 @@ conf(ClusterName, UId, ServerId, _, Peers) -> process_ra_event(State, Wait) -> receive {ra_event, From, Evt} -> - % ct:pal("processed ra event ~p~n", [Evt]), - {internal, _, _, S} = rabbit_fifo_client:handle_ra_event(From, Evt, State), + ct:pal("processed ra event ~p~n", [Evt]), + {internal, _, _, S} = + rabbit_fifo_client:handle_ra_event(From, Evt, State), S after Wait -> exit(ra_event_timeout) diff --git a/test/rabbit_fifo_prop_SUITE.erl b/test/rabbit_fifo_prop_SUITE.erl new file mode 100644 index 0000000000..48e7b9aa7f --- /dev/null +++ b/test/rabbit_fifo_prop_SUITE.erl @@ -0,0 +1,348 @@ +-module(rabbit_fifo_prop_SUITE). + +-compile(export_all). + +-export([ + ]). + +-include_lib("common_test/include/ct.hrl"). +-include_lib("proper/include/proper.hrl"). +-include_lib("eunit/include/eunit.hrl"). + +%%%=================================================================== +%%% Common Test callbacks +%%%=================================================================== + +all() -> + [ + {group, tests} + ]. + + +all_tests() -> + [ + snapshots, + scenario1, + scenario2, + scenario3, + scenario4 + ]. + +groups() -> + [ + {tests, [], all_tests()} + ]. + +init_per_suite(Config) -> + Config. + +end_per_suite(_Config) -> + ok. + +init_per_group(_Group, Config) -> + Config. + +end_per_group(_Group, _Config) -> + ok. + +init_per_testcase(_TestCase, Config) -> + Config. + +end_per_testcase(_TestCase, _Config) -> + ok. + +%%%=================================================================== +%%% Test cases +%%%=================================================================== + +% -type log_op() :: +% {enqueue, pid(), maybe(msg_seqno()), Msg :: raw_msg()}. + +scenario1(_Config) -> + C1 = {<<>>, c:pid(0,6723,1)}, + C2 = {<<0>>,c:pid(0,6723,1)}, + E = c:pid(0,6720,1), + + Commands = [ + {checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {enqueue,E,2,msg2}, + {checkout,cancel,C1}, %% both on returns queue + {checkout,{auto,1,simple_prefetch},C2}, % on on return one on C2 + {return,[0],C2}, %% E1 in returns, E2 with C2 + {return,[1],C2}, %% E2 in returns E1 with C2 + {settle,[2],C2} %% E2 with C2 + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario2(_Config) -> + C1 = {<<>>, c:pid(0,346,1)}, + C2 = {<<>>,c:pid(0,379,1)}, + E = c:pid(0,327,1), + Commands = [{checkout,{auto,1,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {checkout,cancel,C1}, + {enqueue,E,2,msg2}, + {checkout,{auto,1,simple_prefetch},C2}, + {settle,[0],C1}, + {settle,[0],C2} + ], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario3(_Config) -> + C1 = {<<>>, c:pid(0,179,1)}, + E = c:pid(0,176,1), + Commands = [{checkout,{auto,2,simple_prefetch},C1}, + {enqueue,E,1,msg1}, + {return,[0],C1}, + {enqueue,E,2,msg2}, + {enqueue,E,3,msg3}, + {settle,[1],C1}, + {settle,[2],C1}], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +scenario4(_Config) -> + C1 = {<<>>, c:pid(0,179,1)}, + E = c:pid(0,176,1), +Commands = [{checkout,{auto,1,simple_prefetch},C1}, + {enqueue,E,1,msg}, + {settle,[0],C1}], + run_snapshot_test(?FUNCTION_NAME, Commands), + ok. + +snapshots(_Config) -> + run_proper( + fun () -> + ?FORALL(O, ?LET(Ops, log_gen(), expand(Ops)), + test1_prop(O)) + end, [], 1000). + +test1_prop(Commands) -> + ct:pal("Commands: ~p~n", [Commands]), + try run_snapshot_test(?FUNCTION_NAME, Commands) of + _ -> true + catch + Err -> + ct:pal("Err: ~p~n", [Err]), + false + end. + +log_gen() -> + ?LET(EPids, vector(2, pid_gen()), + ?LET(CPids, vector(2, pid_gen()), + resize(200, + list( + frequency( + [{20, enqueue_gen(oneof(EPids))}, + {40, {input_event, + frequency([{10, settle}, + {2, return}, + {1, discard}, + {1, requeue}])}}, + {2, checkout_gen(oneof(CPids))}, + {1, checkout_cancel_gen(oneof(CPids))}, + {1, down_gen(oneof(EPids ++ CPids))}, + {1, purge} + ]))))). + +pid_gen() -> + ?LET(_, integer(), spawn(fun () -> ok end)). + +down_gen(Pid) -> + ?LET(E, {down, Pid, oneof([noconnection, noproc])}, E). + +enqueue_gen(Pid) -> + ?LET(E, {enqueue, Pid, frequency([{10, enqueue}, + {1, delay}])}, E). + +checkout_cancel_gen(Pid) -> + {checkout, Pid, cancel}. + +checkout_gen(Pid) -> + %% pid, tag, prefetch + ?LET(C, {checkout, {binary(), Pid}, choose(1, 10)}, C). + + +-record(t, {state = rabbit_fifo:init(#{name => proper, + shadow_copy_interval => 1}) + :: rabbit_fifo:state(), + index = 1 :: non_neg_integer(), %% raft index + enqueuers = #{} :: #{pid() => term()}, + consumers = #{} :: #{{binary(), pid()} => term()}, + effects = queue:new() :: queue:queue(), + log = [] :: list(), + down = #{} :: #{pid() => noproc | noconnection} + }). + +expand(Ops) -> + %% execute each command against a rabbit_fifo state and capture all releavant + %% effects + T = #t{}, + #t{effects = Effs} = T1 = lists:foldl(fun handle_op/2, T, Ops), + %% process the remaining effects + #t{log = Log} = lists:foldl(fun do_apply/2, + T1#t{effects = queue:new()}, + queue:to_list(Effs)), + + lists:reverse(Log). + + +handle_op({enqueue, Pid, When}, #t{enqueuers = Enqs0, + down = Down, + effects = Effs} = T) -> + case Down of + #{Pid := noproc} -> + %% if it's a noproc then it cannot exist - can it? + %% drop operation + T; + _ -> + Enqs = maps:update_with(Pid, fun (Seq) -> Seq + 1 end, 1, Enqs0), + MsgSeq = maps:get(Pid, Enqs), + Cmd = {enqueue, Pid, MsgSeq, msg}, + case When of + enqueue -> + do_apply(Cmd, T#t{enqueuers = Enqs}); + delay -> + %% just put the command on the effects queue + ct:pal("delaying ~w", [Cmd]), + T#t{effects = queue:in(Cmd, Effs)} + end + end; +handle_op({checkout, Pid, cancel}, #t{consumers = Cons0} = T) -> + case maps:keys( + maps:filter(fun ({_, P}, _) when P == Pid -> true; + (_, _) -> false + end, Cons0)) of + [CId | _] -> + Cons = maps:remove(CId, Cons0), + Cmd = {checkout, cancel, CId}, + do_apply(Cmd, T#t{consumers = Cons}); + _ -> + T + end; +handle_op({checkout, CId, Prefetch}, #t{consumers = Cons0} = T) -> + case Cons0 of + #{CId := _} -> + %% ignore if it already exists + T; + _ -> + Cons = maps:put(CId, ok, Cons0), + Cmd = {checkout, {auto, Prefetch, simple_prefetch}, CId}, + do_apply(Cmd, T#t{consumers = Cons}) + end; +handle_op({down, Pid, Reason} = Cmd, #t{down = Down} = T) -> + case Down of + #{Pid := noproc} -> + %% it it permanently down, cannot upgrade + T; + _ -> + %% it is either not down or down with noconnection + do_apply(Cmd, T#t{down = maps:put(Pid, Reason, Down)}) + end; +handle_op({input_event, requeue}, #t{effects = Effs} = T) -> + %% this simulates certain settlements arriving out of order + case queue:out(Effs) of + {{value, Cmd}, Q} -> + T#t{effects = queue:in(Cmd, Q)}; + _ -> + T + end; +handle_op({input_event, Settlement}, #t{effects = Effs} = T) -> + case queue:out(Effs) of + {{value, {settle, MsgIds, CId}}, Q} -> + do_apply({Settlement, MsgIds, CId}, T#t{effects = Q}); + {{value, {enqueue, _, _, _} = Cmd}, Q} -> + do_apply(Cmd, T#t{effects = Q}); + _ -> + T + end; +handle_op(purge, T) -> + do_apply(purge, T). + +do_apply(Cmd, #t{effects = Effs, index = Index, state = S0, + log = Log} = T) -> + {S, Effects, _} = rabbit_fifo:apply(#{index => Index}, Cmd, [], S0), + T#t{state = S, + index = Index + 1, + effects = enq_effs(Effects, Effs), + log = [Cmd | Log]}. + +enq_effs([], Q) -> Q; +enq_effs([{send_msg, P, {delivery, CTag, Msgs}, ra_event} | Rem], Q) -> + MsgIds = [I || {I, _} <- Msgs], + %% always make settle commands by default + %% they can be changed depending on the input event later + Cmd = {settle, MsgIds, {CTag, P}}, + enq_effs(Rem, queue:in(Cmd, Q)); +enq_effs([_ | Rem], Q) -> + % ct:pal("enq_effs dropping ~w~n", [E]), + enq_effs(Rem, Q). + + +%% Utility +run_proper(Fun, Args, NumTests) -> + ?assertEqual( + true, + proper:counterexample( + erlang:apply(Fun, Args), + [{numtests, NumTests}, + {on_output, fun(".", _) -> ok; % don't print the '.'s on new lines + (F, A) -> ct:pal(?LOW_IMPORTANCE, F, A) + end}])). + +run_snapshot_test(Name, Commands) -> + %% create every incremental permuation of the commands lists + %% and run the snapshot tests against that + [begin + % ?debugFmt("~w running command to ~w~n", [?FUNCTION_NAME, lists:last(C)]), + run_snapshot_test0(Name, C) + end || C <- prefixes(Commands, 1, [])]. + +run_snapshot_test0(Name, Commands) -> + Indexes = lists:seq(1, length(Commands)), + Entries = lists:zip(Indexes, Commands), + {State, Effects} = run_log(test_init(Name), Entries), + + [begin + Filtered = lists:dropwhile(fun({X, _}) when X =< SnapIdx -> true; + (_) -> false + end, Entries), + % L = case Filtered of + % [] -> undefined; + % _ ->lists:last(Filtered) + % end, + + % ct:pal("running from snapshot: ~b to ~w" + % "~n~p~n", + % [SnapIdx, L, SnapState]), + {S, _} = run_log(SnapState, Filtered), + % assert log can be restored from any release cursor index + % ?debugFmt("Name ~p Idx ~p S~p~nState~p~nSnapState ~p~nFiltered ~p~n", + % [Name, SnapIdx, S, State, SnapState, Filtered]), + ?assertEqual(State, S) + end || {release_cursor, SnapIdx, SnapState} <- Effects], + ok. + +prefixes(Source, N, Acc) when N > length(Source) -> + lists:reverse(Acc); +prefixes(Source, N, Acc) -> + {X, _} = lists:split(N, Source), + prefixes(Source, N+1, [X | Acc]). + +run_log(InitState, Entries) -> + lists:foldl(fun ({Idx, E}, {Acc0, Efx0}) -> + case rabbit_fifo:apply(meta(Idx), E, Efx0, Acc0) of + {Acc, Efx, _} -> + {Acc, Efx} + end + end, {InitState, []}, Entries). + +test_init(Name) -> + rabbit_fifo:init(#{name => Name, + shadow_copy_interval => 0, + metrics_handler => {?MODULE, metrics_handler, []}}). +meta(Idx) -> + #{index => Idx, term => 1}. |
