diff options
author | Michael Klishin <michael@clojurewerkz.org> | 2021-04-30 01:16:00 +0300 |
---|---|---|
committer | Michael Klishin <michael@clojurewerkz.org> | 2021-04-30 01:16:00 +0300 |
commit | 7e8cfc28436b32350711be5acaa536f6bbb1db96 (patch) | |
tree | 000d05af680d4c67870631e6a221e72b8207fcd4 | |
parent | f6e78e7450003d4f5b8d0afcfb55cded4be5061f (diff) | |
parent | f90b4be949f4d4a0c552c3f5d75aa8cedcc518a0 (diff) | |
download | rabbitmq-server-git-mk-erlang-24-compatibility-backports.tar.gz |
Merge branch 'v3.8.x' into mk-erlang-24-compatibility-backportsmk-erlang-24-compatibility-backports
-rw-r--r-- | deps/rabbit/src/rabbit_amqqueue.erl | 2 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_slave.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_mirror_queue_sync.erl | 9 | ||||
-rw-r--r-- | deps/rabbit/src/rabbit_policy.erl | 22 | ||||
-rw-r--r-- | deps/rabbit/test/rabbit_ha_test_consumer.erl | 3 | ||||
-rw-r--r-- | deps/rabbit_common/src/rabbit_data_coercion.erl | 9 | ||||
-rw-r--r-- | deps/rabbit_common/test/unit_SUITE.erl | 12 | ||||
-rw-r--r-- | deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs | 2 |
8 files changed, 49 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl index 1bee56ed6c..e8d19928d3 100644 --- a/deps/rabbit/src/rabbit_amqqueue.erl +++ b/deps/rabbit/src/rabbit_amqqueue.erl @@ -596,7 +596,7 @@ maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) -> [{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired -> Name = amqqueue:get_name(Q), Module = rebalance_module(Q), - Candidates = Module:get_replicas(Q) -- [N], + Candidates = rabbit_maintenance:filter_out_drained_nodes_local_read(Module:get_replicas(Q) -- [N]), case Candidates of [] -> {not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)}; diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl index 6cd2706ea9..4f219a3659 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl @@ -385,8 +385,13 @@ handle_info({bump_credit, Msg}, State) -> credit_flow:handle_bump_msg(Msg), noreply(State); -handle_info(bump_reduce_memory_use, State) -> - noreply(State); +handle_info(bump_reduce_memory_use, State = #state{backing_queue = BQ, + backing_queue_state = BQS}) -> + BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS), + BQS2 = BQ:resume(BQS1), + noreply(State#state{ + backing_queue_state = BQS2 + }); %% In the event of a short partition during sync we can detect the %% master's 'death', drop out of sync, and then receive sync messages diff --git a/deps/rabbit/src/rabbit_mirror_queue_sync.erl b/deps/rabbit/src/rabbit_mirror_queue_sync.erl index 81f008bcef..896bdd5c61 100644 --- a/deps/rabbit/src/rabbit_mirror_queue_sync.erl +++ b/deps/rabbit/src/rabbit_mirror_queue_sync.erl @@ -288,6 +288,9 @@ wait_for_credit(SPids) -> end. wait_for_resources(Ref, SPids) -> + erlang:garbage_collect(), + % Probably bump_reduce_memory_use messages should be handled here as well, + % otherwise the BQ is not pushing messages to disk receive {conserve_resources, memory, false} -> SPids; @@ -367,7 +370,11 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent}, %% If the master throws an exception {'$gen_cast', {gm, {delete_and_terminate, Reason}}} -> BQ:delete_and_terminate(Reason, BQS), - {stop, Reason, {[], TRef, undefined}} + {stop, Reason, {[], TRef, undefined}}; + bump_reduce_memory_use -> + BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS), + BQS2 = BQ:resume(BQS1), + slave_sync_loop(Args, {MA, TRef, BQS2}) end. %% We are partitioning messages by the Unacked element in the tuple. diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl index 0c72e58d71..c7c7f88592 100644 --- a/deps/rabbit/src/rabbit_policy.erl +++ b/deps/rabbit/src/rabbit_policy.erl @@ -350,24 +350,26 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) -> rabbit_parameter_validation:proplist( Name, operator_policy_validation(), Term). -notify(VHost, <<"policy">>, Name, Term, ActingUser) -> +notify(VHost, <<"policy">>, Name, Term0, ActingUser) -> + update_policies(VHost), + Term = rabbit_data_coercion:atomize_keys(Term0), rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost}, - {user_who_performed_action, ActingUser} | Term]), - update_policies(VHost); -notify(VHost, <<"operator_policy">>, Name, Term, ActingUser) -> + {user_who_performed_action, ActingUser} | Term]); +notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) -> + update_policies(VHost), + Term = rabbit_data_coercion:atomize_keys(Term0), rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost}, - {user_who_performed_action, ActingUser} | Term]), - update_policies(VHost). + {user_who_performed_action, ActingUser} | Term]). notify_clear(VHost, <<"policy">>, Name, ActingUser) -> + update_policies(VHost), rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost}, - {user_who_performed_action, ActingUser}]), - update_policies(VHost); + {user_who_performed_action, ActingUser}]); notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) -> + update_policies(VHost), rabbit_event:notify(operator_policy_cleared, [{name, Name}, {vhost, VHost}, - {user_who_performed_action, ActingUser}]), - update_policies(VHost). + {user_who_performed_action, ActingUser}]). %%---------------------------------------------------------------------------- diff --git a/deps/rabbit/test/rabbit_ha_test_consumer.erl b/deps/rabbit/test/rabbit_ha_test_consumer.erl index f4abfdd098..b50ef3e323 100644 --- a/deps/rabbit/test/rabbit_ha_test_consumer.erl +++ b/deps/rabbit/test/rabbit_ha_test_consumer.erl @@ -54,9 +54,6 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) -> run(TestPid, Channel, Queue, CancelOnFailover, MsgNum, MsgsToConsume - 1); MsgNum >= LowestSeen -> - error_logger:info_msg( - "consumer ~p on ~p ignoring redelivered msg ~p~n", - [self(), Channel, MsgNum]), true = Redelivered, %% ASSERTION run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume); diff --git a/deps/rabbit_common/src/rabbit_data_coercion.erl b/deps/rabbit_common/src/rabbit_data_coercion.erl index a356e486d2..35f6171abf 100644 --- a/deps/rabbit_common/src/rabbit_data_coercion.erl +++ b/deps/rabbit_common/src/rabbit_data_coercion.erl @@ -8,7 +8,7 @@ -module(rabbit_data_coercion). -export([to_binary/1, to_list/1, to_atom/1, to_integer/1, to_proplist/1, to_map/1]). --export([to_atom/2]). +-export([to_atom/2, atomize_keys/1]). -spec to_binary(Val :: binary() | list() | atom() | integer()) -> binary(). to_binary(Val) when is_list(Val) -> list_to_binary(Val); @@ -45,3 +45,10 @@ to_proplist(Val) when is_map(Val) -> maps:to_list(Val). -spec to_map(Val :: map() | list()) -> map(). to_map(Val) when is_map(Val) -> Val; to_map(Val) when is_list(Val) -> maps:from_list(Val). + + +-spec atomize_keys(Val :: map() | list()) -> map() | list(). +atomize_keys(Val) when is_list(Val) -> + [{to_atom(K), V} || {K, V} <- Val]; +atomize_keys(Val) when is_map(Val) -> + maps:from_list(atomize_keys(maps:to_list(Val))).
\ No newline at end of file diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl index c6a2f5afdb..105488bed0 100644 --- a/deps/rabbit_common/test/unit_SUITE.erl +++ b/deps/rabbit_common/test/unit_SUITE.erl @@ -33,6 +33,8 @@ groups() -> data_coercion_to_proplist, data_coercion_to_list, data_coercion_to_map, + data_coercion_atomize_keys_proplist, + data_coercion_atomize_keys_map, pget, encrypt_decrypt, encrypt_decrypt_term, @@ -299,6 +301,16 @@ data_coercion_to_proplist(_Config) -> ?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist([{a, 1}])), ?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist(#{a => 1})). +data_coercion_atomize_keys_map(_Config) -> + A = #{a => 1, b => 2, c => 3}, + B = rabbit_data_coercion:atomize_keys(#{a => 1, "b" => 2, <<"c">> => 3}), + ?assertEqual(A, B). + +data_coercion_atomize_keys_proplist(_Config) -> + A = [{a, 1}, {b, 2}, {c, 3}], + B = rabbit_data_coercion:atomize_keys([{a, 1}, {"b", 2}, {<<"c">>, 3}]), + ?assertEqual(lists:usort(A), lists:usort(B)). + data_coercion_to_list(_Config) -> ?assertEqual([{a, 1}], rabbit_data_coercion:to_list([{a, 1}])), ?assertEqual([{a, 1}], rabbit_data_coercion:to_list(#{a => 1})). diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs index 7543580633..9a9e3afaa6 100644 --- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs +++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs @@ -290,7 +290,7 @@ <td> <select name="ackmode"> <option value="ack_requeue_true" selected>Nack message requeue true</option> - <option value="ack_requeue_false">Ack message requeue false</option> + <option value="ack_requeue_false">Automatic ack</option> <option value="reject_requeue_true">Reject requeue true</option> <option value="reject_requeue_false">Reject requeue false</option> </select> |