summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <michael@clojurewerkz.org>2021-04-30 01:16:00 +0300
committerMichael Klishin <michael@clojurewerkz.org>2021-04-30 01:16:00 +0300
commit7e8cfc28436b32350711be5acaa536f6bbb1db96 (patch)
tree000d05af680d4c67870631e6a221e72b8207fcd4
parentf6e78e7450003d4f5b8d0afcfb55cded4be5061f (diff)
parentf90b4be949f4d4a0c552c3f5d75aa8cedcc518a0 (diff)
downloadrabbitmq-server-git-mk-erlang-24-compatibility-backports.tar.gz
Merge branch 'v3.8.x' into mk-erlang-24-compatibility-backportsmk-erlang-24-compatibility-backports
-rw-r--r--deps/rabbit/src/rabbit_amqqueue.erl2
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_slave.erl9
-rw-r--r--deps/rabbit/src/rabbit_mirror_queue_sync.erl9
-rw-r--r--deps/rabbit/src/rabbit_policy.erl22
-rw-r--r--deps/rabbit/test/rabbit_ha_test_consumer.erl3
-rw-r--r--deps/rabbit_common/src/rabbit_data_coercion.erl9
-rw-r--r--deps/rabbit_common/test/unit_SUITE.erl12
-rw-r--r--deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs2
8 files changed, 49 insertions, 19 deletions
diff --git a/deps/rabbit/src/rabbit_amqqueue.erl b/deps/rabbit/src/rabbit_amqqueue.erl
index 1bee56ed6c..e8d19928d3 100644
--- a/deps/rabbit/src/rabbit_amqqueue.erl
+++ b/deps/rabbit/src/rabbit_amqqueue.erl
@@ -596,7 +596,7 @@ maybe_migrate(ByNode, MaxQueuesDesired, [N | Nodes]) ->
[{_, Q, false} = Queue | Queues] = All when length(All) > MaxQueuesDesired ->
Name = amqqueue:get_name(Q),
Module = rebalance_module(Q),
- Candidates = Module:get_replicas(Q) -- [N],
+ Candidates = rabbit_maintenance:filter_out_drained_nodes_local_read(Module:get_replicas(Q) -- [N]),
case Candidates of
[] ->
{not_migrated, update_not_migrated_queue(N, Queue, Queues, ByNode)};
diff --git a/deps/rabbit/src/rabbit_mirror_queue_slave.erl b/deps/rabbit/src/rabbit_mirror_queue_slave.erl
index 6cd2706ea9..4f219a3659 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_slave.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_slave.erl
@@ -385,8 +385,13 @@ handle_info({bump_credit, Msg}, State) ->
credit_flow:handle_bump_msg(Msg),
noreply(State);
-handle_info(bump_reduce_memory_use, State) ->
- noreply(State);
+handle_info(bump_reduce_memory_use, State = #state{backing_queue = BQ,
+ backing_queue_state = BQS}) ->
+ BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS),
+ BQS2 = BQ:resume(BQS1),
+ noreply(State#state{
+ backing_queue_state = BQS2
+ });
%% In the event of a short partition during sync we can detect the
%% master's 'death', drop out of sync, and then receive sync messages
diff --git a/deps/rabbit/src/rabbit_mirror_queue_sync.erl b/deps/rabbit/src/rabbit_mirror_queue_sync.erl
index 81f008bcef..896bdd5c61 100644
--- a/deps/rabbit/src/rabbit_mirror_queue_sync.erl
+++ b/deps/rabbit/src/rabbit_mirror_queue_sync.erl
@@ -288,6 +288,9 @@ wait_for_credit(SPids) ->
end.
wait_for_resources(Ref, SPids) ->
+ erlang:garbage_collect(),
+ % Probably bump_reduce_memory_use messages should be handled here as well,
+ % otherwise the BQ is not pushing messages to disk
receive
{conserve_resources, memory, false} ->
SPids;
@@ -367,7 +370,11 @@ slave_sync_loop(Args = {Ref, MRef, Syncer, BQ, UpdateRamDuration, Parent},
%% If the master throws an exception
{'$gen_cast', {gm, {delete_and_terminate, Reason}}} ->
BQ:delete_and_terminate(Reason, BQS),
- {stop, Reason, {[], TRef, undefined}}
+ {stop, Reason, {[], TRef, undefined}};
+ bump_reduce_memory_use ->
+ BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS),
+ BQS2 = BQ:resume(BQS1),
+ slave_sync_loop(Args, {MA, TRef, BQS2})
end.
%% We are partitioning messages by the Unacked element in the tuple.
diff --git a/deps/rabbit/src/rabbit_policy.erl b/deps/rabbit/src/rabbit_policy.erl
index 0c72e58d71..c7c7f88592 100644
--- a/deps/rabbit/src/rabbit_policy.erl
+++ b/deps/rabbit/src/rabbit_policy.erl
@@ -350,24 +350,26 @@ validate(_VHost, <<"operator_policy">>, Name, Term, _User) ->
rabbit_parameter_validation:proplist(
Name, operator_policy_validation(), Term).
-notify(VHost, <<"policy">>, Name, Term, ActingUser) ->
+notify(VHost, <<"policy">>, Name, Term0, ActingUser) ->
+ update_policies(VHost),
+ Term = rabbit_data_coercion:atomize_keys(Term0),
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
- {user_who_performed_action, ActingUser} | Term]),
- update_policies(VHost);
-notify(VHost, <<"operator_policy">>, Name, Term, ActingUser) ->
+ {user_who_performed_action, ActingUser} | Term]);
+notify(VHost, <<"operator_policy">>, Name, Term0, ActingUser) ->
+ update_policies(VHost),
+ Term = rabbit_data_coercion:atomize_keys(Term0),
rabbit_event:notify(policy_set, [{name, Name}, {vhost, VHost},
- {user_who_performed_action, ActingUser} | Term]),
- update_policies(VHost).
+ {user_who_performed_action, ActingUser} | Term]).
notify_clear(VHost, <<"policy">>, Name, ActingUser) ->
+ update_policies(VHost),
rabbit_event:notify(policy_cleared, [{name, Name}, {vhost, VHost},
- {user_who_performed_action, ActingUser}]),
- update_policies(VHost);
+ {user_who_performed_action, ActingUser}]);
notify_clear(VHost, <<"operator_policy">>, Name, ActingUser) ->
+ update_policies(VHost),
rabbit_event:notify(operator_policy_cleared,
[{name, Name}, {vhost, VHost},
- {user_who_performed_action, ActingUser}]),
- update_policies(VHost).
+ {user_who_performed_action, ActingUser}]).
%%----------------------------------------------------------------------------
diff --git a/deps/rabbit/test/rabbit_ha_test_consumer.erl b/deps/rabbit/test/rabbit_ha_test_consumer.erl
index f4abfdd098..b50ef3e323 100644
--- a/deps/rabbit/test/rabbit_ha_test_consumer.erl
+++ b/deps/rabbit/test/rabbit_ha_test_consumer.erl
@@ -54,9 +54,6 @@ run(TestPid, Channel, Queue, CancelOnFailover, LowestSeen, MsgsToConsume) ->
run(TestPid, Channel, Queue,
CancelOnFailover, MsgNum, MsgsToConsume - 1);
MsgNum >= LowestSeen ->
- error_logger:info_msg(
- "consumer ~p on ~p ignoring redelivered msg ~p~n",
- [self(), Channel, MsgNum]),
true = Redelivered, %% ASSERTION
run(TestPid, Channel, Queue,
CancelOnFailover, LowestSeen, MsgsToConsume);
diff --git a/deps/rabbit_common/src/rabbit_data_coercion.erl b/deps/rabbit_common/src/rabbit_data_coercion.erl
index a356e486d2..35f6171abf 100644
--- a/deps/rabbit_common/src/rabbit_data_coercion.erl
+++ b/deps/rabbit_common/src/rabbit_data_coercion.erl
@@ -8,7 +8,7 @@
-module(rabbit_data_coercion).
-export([to_binary/1, to_list/1, to_atom/1, to_integer/1, to_proplist/1, to_map/1]).
--export([to_atom/2]).
+-export([to_atom/2, atomize_keys/1]).
-spec to_binary(Val :: binary() | list() | atom() | integer()) -> binary().
to_binary(Val) when is_list(Val) -> list_to_binary(Val);
@@ -45,3 +45,10 @@ to_proplist(Val) when is_map(Val) -> maps:to_list(Val).
-spec to_map(Val :: map() | list()) -> map().
to_map(Val) when is_map(Val) -> Val;
to_map(Val) when is_list(Val) -> maps:from_list(Val).
+
+
+-spec atomize_keys(Val :: map() | list()) -> map() | list().
+atomize_keys(Val) when is_list(Val) ->
+ [{to_atom(K), V} || {K, V} <- Val];
+atomize_keys(Val) when is_map(Val) ->
+ maps:from_list(atomize_keys(maps:to_list(Val))). \ No newline at end of file
diff --git a/deps/rabbit_common/test/unit_SUITE.erl b/deps/rabbit_common/test/unit_SUITE.erl
index c6a2f5afdb..105488bed0 100644
--- a/deps/rabbit_common/test/unit_SUITE.erl
+++ b/deps/rabbit_common/test/unit_SUITE.erl
@@ -33,6 +33,8 @@ groups() ->
data_coercion_to_proplist,
data_coercion_to_list,
data_coercion_to_map,
+ data_coercion_atomize_keys_proplist,
+ data_coercion_atomize_keys_map,
pget,
encrypt_decrypt,
encrypt_decrypt_term,
@@ -299,6 +301,16 @@ data_coercion_to_proplist(_Config) ->
?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist([{a, 1}])),
?assertEqual([{a, 1}], rabbit_data_coercion:to_proplist(#{a => 1})).
+data_coercion_atomize_keys_map(_Config) ->
+ A = #{a => 1, b => 2, c => 3},
+ B = rabbit_data_coercion:atomize_keys(#{a => 1, "b" => 2, <<"c">> => 3}),
+ ?assertEqual(A, B).
+
+data_coercion_atomize_keys_proplist(_Config) ->
+ A = [{a, 1}, {b, 2}, {c, 3}],
+ B = rabbit_data_coercion:atomize_keys([{a, 1}, {"b", 2}, {<<"c">>, 3}]),
+ ?assertEqual(lists:usort(A), lists:usort(B)).
+
data_coercion_to_list(_Config) ->
?assertEqual([{a, 1}], rabbit_data_coercion:to_list([{a, 1}])),
?assertEqual([{a, 1}], rabbit_data_coercion:to_list(#{a => 1})).
diff --git a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
index 7543580633..9a9e3afaa6 100644
--- a/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
+++ b/deps/rabbitmq_management/priv/www/js/tmpl/queue.ejs
@@ -290,7 +290,7 @@
<td>
<select name="ackmode">
<option value="ack_requeue_true" selected>Nack message requeue true</option>
- <option value="ack_requeue_false">Ack message requeue false</option>
+ <option value="ack_requeue_false">Automatic ack</option>
<option value="reject_requeue_true">Reject requeue true</option>
<option value="reject_requeue_false">Reject requeue false</option>
</select>