diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/gm.erl | 19 | ||||
| -rw-r--r-- | src/rabbit.app.src | 5 | ||||
| -rw-r--r-- | src/rabbit.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 8 | ||||
| -rw-r--r-- | src/rabbit_diagnostics.erl | 1 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_mnesia.erl | 30 | ||||
| -rw-r--r-- | src/rabbit_mnesia_rename.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_policy.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_runtime_parameters.erl | 15 | ||||
| -rw-r--r-- | src/rabbit_table.erl | 65 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 111 | ||||
| -rw-r--r-- | src/rabbit_vhost_limit.erl | 7 | ||||
| -rw-r--r-- | src/rabbit_vm.erl | 3 |
14 files changed, 181 insertions, 113 deletions
diff --git a/src/gm.erl b/src/gm.erl index aa4ffcf511..3a1459fea4 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) -> try Group = #gm_group { members = Members, version = Ver } = check_membership(Left, read_group(GroupName)), - {Prefix, [Left | Suffix]} = - lists:splitwith(fun (M) -> M =/= Left end, Members), - write_group(Group #gm_group { - members = Prefix ++ [Left, NewMember | Suffix], - version = Ver + 1 }) + case lists:member(NewMember, Members) of + true -> + %% This avois duplicates during partial partitions, + %% as inconsistent views might happen during them + rabbit_log:warning("(~p) GM avoiding duplicate of ~p", + [self(), NewMember]), + Group; + false -> + {Prefix, [Left | Suffix]} = + lists:splitwith(fun (M) -> M =/= Left end, Members), + write_group(Group #gm_group { + members = Prefix ++ [Left, NewMember | Suffix], + version = Ver + 1 }) + end catch lost_membership -> %% The transaction must not be abruptly crashed, but diff --git a/src/rabbit.app.src b/src/rabbit.app.src index c06f7630fa..250b14bc32 100644 --- a/src/rabbit.app.src +++ b/src/rabbit.app.src @@ -12,7 +12,7 @@ rabbit_direct_client_sup]}, %% FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore %% indirect dependencies of rabbit. - {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl]}, + {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl, jsx]}, %% we also depend on crypto, public_key and ssl but they shouldn't be %% in here as we don't actually want to start it {mod, {rabbit, []}}, @@ -47,7 +47,8 @@ {server_properties, []}, {collect_statistics, none}, {collect_statistics_interval, 5000}, - {mnesia_table_loading_timeout, 30000}, + {mnesia_table_loading_retry_timeout, 30000}, + {mnesia_table_loading_retry_limit, 10}, {auth_mechanisms, ['PLAIN', 'AMQPLAIN']}, {auth_backends, [rabbit_auth_backend_internal]}, {delegate_count, 16}, diff --git a/src/rabbit.erl b/src/rabbit.erl index 2581fd49f6..6dde2aca88 100644 --- a/src/rabbit.erl +++ b/src/rabbit.erl @@ -19,7 +19,7 @@ -behaviour(application). -export([start/0, boot/0, stop/0, - stop_and_halt/0, await_startup/0, status/0, is_running/0, + stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0, is_running/1, environment/0, rotate_logs/0, force_event_refresh/1, start_fhc/0]). -export([start/2, stop/1, prep_stop/1]). @@ -1069,9 +1069,9 @@ ensure_working_fhc() -> end, TestPid = spawn_link(TestFun), %% Because we are waiting for the test fun, abuse the - %% 'mnesia_table_loading_timeout' parameter to find a sane timeout + %% 'mnesia_table_loading_retry_timeout' parameter to find a sane timeout %% value. - Timeout = rabbit_table:wait_timeout(), + Timeout = rabbit_table:retry_timeout(), receive fhc_ok -> ok; {'EXIT', TestPid, Exception} -> throw({ensure_working_fhc, Exception}) diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index 3e6d961d5f..25555156d6 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -1334,7 +1334,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) -> %% This also has the side effect of waking us up so we emit a %% stats event - so event consumers see the changed policy. {ok, Q} = rabbit_amqqueue:lookup(Name), - noreply(process_args_policy(State#q{q = Q})). + noreply(process_args_policy(State#q{q = Q})); + +handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) -> + %% Only a slave should receive this, it means we are a duplicated master + rabbit_mirror_queue_misc:log_warning( + Name, "Stopping after receiving sync_start from another master", []), + stop(State). handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) -> case is_unused(State) of diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl index d28bb9ffd7..e5df1d5baf 100644 --- a/src/rabbit_diagnostics.erl +++ b/src/rabbit_diagnostics.erl @@ -64,7 +64,6 @@ maybe_stuck_stacktrace({prim_inet, accept0, _}) -> false; maybe_stuck_stacktrace({prim_inet, recv0, _}) -> false; maybe_stuck_stacktrace({rabbit_heartbeat, heartbeater, _}) -> false; maybe_stuck_stacktrace({rabbit_net, recv, _}) -> false; -maybe_stuck_stacktrace({mochiweb_http, request, _}) -> false; maybe_stuck_stacktrace({group, _, _}) -> false; maybe_stuck_stacktrace({shell, _, _}) -> false; maybe_stuck_stacktrace({io, _, _}) -> false; diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index 29ba21d374..61623c9441 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -336,7 +336,12 @@ handle_cast({set_ram_duration_target, Duration}, State = #state { backing_queue = BQ, backing_queue_state = BQS }) -> BQS1 = BQ:set_ram_duration_target(Duration, BQS), - noreply(State #state { backing_queue_state = BQS1 }). + noreply(State #state { backing_queue_state = BQS1 }); + +handle_cast(policy_changed, State) -> + %% During partial partitions, we might end up receiving messages expected by a master + %% Ignore them + noreply(State). handle_info(update_ram_duration, State = #state{backing_queue = BQ, backing_queue_state = BQS}) -> diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl index d66f5b8fab..1ec9a46880 100644 --- a/src/rabbit_mnesia.erl +++ b/src/rabbit_mnesia.erl @@ -107,7 +107,7 @@ init() -> false -> NodeType = node_type(), init_db_and_upgrade(cluster_nodes(all), NodeType, - NodeType =:= ram) + NodeType =:= ram, _Retry = true) end, %% We intuitively expect the global name server to be synced when %% Mnesia is up. In fact that's not guaranteed to be the case - @@ -141,7 +141,7 @@ init_from_config() -> e(invalid_cluster_nodes_conf) end, case DiscoveredNodes of - [] -> init_db_and_upgrade([node()], disc, false); + [] -> init_db_and_upgrade([node()], disc, false, _Retry = true); _ -> rabbit_log:info("Discovered peer nodes: ~s~n", [rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]), @@ -153,14 +153,14 @@ auto_cluster(TryNodes, NodeType) -> {ok, Node} -> rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]), {ok, {_, DiscNodes, _}} = discover_cluster0(Node), - init_db_and_upgrade(DiscNodes, NodeType, true), + init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true), rabbit_connection_tracking:boot(), rabbit_node_monitor:notify_joined_cluster(); none -> rabbit_log:warning( "Could not find any node for auto-clustering from: ~p~n" "Starting blank node...~n", [TryNodes]), - init_db_and_upgrade([node()], disc, false) + init_db_and_upgrade([node()], disc, false, _Retry = true) end. %% Make the node join a cluster. The node will be reset automatically @@ -200,7 +200,7 @@ join_cluster(DiscoveryNode, NodeType) -> rabbit_log:info("Clustering with ~p as ~p node~n", [ClusterNodes, NodeType]), ok = init_db_with_mnesia(ClusterNodes, NodeType, - true, true), + true, true, _Retry = true), rabbit_connection_tracking:boot(), rabbit_node_monitor:notify_joined_cluster(), ok; @@ -240,7 +240,7 @@ reset_gracefully() -> %% need to check for consistency because we are resetting. %% Force=true here so that reset still works when clustered with a %% node which is down. - init_db_with_mnesia(AllNodes, node_type(), false, false), + init_db_with_mnesia(AllNodes, node_type(), false, false, _Retry = false), case is_only_clustered_disc_node() of true -> e(resetting_only_disc_node); false -> ok @@ -289,7 +289,7 @@ update_cluster_nodes(DiscoveryNode) -> rabbit_node_monitor:write_cluster_status(Status), rabbit_log:info("Updating cluster nodes from ~p~n", [DiscoveryNode]), - init_db_with_mnesia(AllNodes, node_type(), true, true); + init_db_with_mnesia(AllNodes, node_type(), true, true, _Retry = false); false -> e(inconsistent_cluster) end, @@ -339,7 +339,7 @@ remove_node_offline_node(Node) -> %% is by force loading the table, and making sure that %% they are loaded. rabbit_table:force_load(), - rabbit_table:wait_for_replicated(), + rabbit_table:wait_for_replicated(_Retry = false), %% We skip the 'node_deleted' event because the %% application is stopped and thus, rabbit_event is not %% enabled. @@ -487,7 +487,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> {[_ | _], _, _} -> %% Subsequent node in cluster, catch up maybe_force_load(), - ok = rabbit_table:wait_for_replicated(), + ok = rabbit_table:wait_for_replicated(_Retry = true), ok = rabbit_table:create_local_copy(NodeType) end, ensure_schema_integrity(), @@ -497,7 +497,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) -> init_db_unchecked(ClusterNodes, NodeType) -> init_db(ClusterNodes, NodeType, false). -init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> +init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry) -> ok = init_db(ClusterNodes, NodeType, CheckOtherNodes), ok = case rabbit_upgrade:maybe_upgrade_local() of ok -> ok; @@ -512,14 +512,14 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) -> disc -> ok end, %% ...and all nodes will need to wait for tables - rabbit_table:wait_for_replicated(), + rabbit_table:wait_for_replicated(Retry), ok. init_db_with_mnesia(ClusterNodes, NodeType, - CheckOtherNodes, CheckConsistency) -> + CheckOtherNodes, CheckConsistency, Retry) -> start_mnesia(CheckConsistency), try - init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) + init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry) after stop_mnesia() end. @@ -556,7 +556,7 @@ ensure_mnesia_not_running() -> end. ensure_schema_integrity() -> - case rabbit_table:check_schema_integrity() of + case rabbit_table:check_schema_integrity(_Retry = true) of ok -> ok; {error, Reason} -> @@ -687,7 +687,7 @@ discover_cluster0(Node) -> rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []). schema_ok_or_move() -> - case rabbit_table:check_schema_integrity() of + case rabbit_table:check_schema_integrity(_Retry = false) of ok -> ok; {error, Reason} -> diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl index 0c3e7c2366..2d7e0f56b6 100644 --- a/src/rabbit_mnesia_rename.erl +++ b/src/rabbit_mnesia_rename.erl @@ -193,7 +193,7 @@ delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]). start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia), rabbit_table:force_load(), - rabbit_table:wait_for_replicated(). + rabbit_table:wait_for_replicated(_Retry = false). stop_mnesia() -> stopped = mnesia:stop(). convert_backup(NodeMap, FromBackup, ToBackup) -> diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl index 7e39164882..cfbf116cbd 100644 --- a/src/rabbit_policy.erl +++ b/src/rabbit_policy.erl @@ -212,11 +212,12 @@ parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) -> end. parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) -> - case rabbit_misc:json_decode(Defn) of - {ok, JSON} -> + Definition = rabbit_data_coercion:to_binary(Defn), + case rabbit_json:try_decode(Definition) of + {ok, Term} -> set0(Type, VHost, Name, [{<<"pattern">>, list_to_binary(Pattern)}, - {<<"definition">>, rabbit_misc:json_to_term(JSON)}, + {<<"definition">>, maps:to_list(Term)}, {<<"priority">>, Priority}, {<<"apply-to">>, ApplyTo}]); error -> @@ -270,7 +271,7 @@ list_op(VHost) -> list0_op(VHost, fun ident/1). list_formatted_op(VHost) -> - order_policies(list0_op(VHost, fun format/1)). + order_policies(list0_op(VHost, fun rabbit_json:encode/1)). list_formatted_op(VHost, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map(AggregatorPid, Ref, @@ -288,7 +289,7 @@ list(VHost) -> list0(VHost, fun ident/1). list_formatted(VHost) -> - order_policies(list0(VHost, fun format/1)). + order_policies(list0(VHost, fun rabbit_json:encode/1)). list_formatted(VHost, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map(AggregatorPid, Ref, @@ -309,10 +310,6 @@ p(Parameter, DefnFun) -> {definition, DefnFun(pget(<<"definition">>, Value))}, {priority, pget(<<"priority">>, Value)}]. -format(Term) -> - {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), - list_to_binary(JSON). - ident(X) -> X. info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority]. diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl index 97f78da8ba..072a48be3d 100644 --- a/src/rabbit_runtime_parameters.erl +++ b/src/rabbit_runtime_parameters.erl @@ -97,9 +97,10 @@ parse_set(_, <<"policy">>, _, _, _) -> {error_string, "policies may not be set using this method"}; parse_set(VHost, Component, Name, String, User) -> - case rabbit_misc:json_decode(String) of - {ok, JSON} -> set(VHost, Component, Name, - rabbit_misc:json_to_term(JSON), User); + Definition = rabbit_data_coercion:to_binary(String), + case rabbit_json:try_decode(Definition) of + {ok, Term} when is_map(Term) -> set(VHost, Component, Name, maps:to_list(Term), User); + {ok, Term} -> set(VHost, Component, Name, Term, User); error -> {error_string, "JSON decoding error"} end. @@ -235,12 +236,12 @@ list(VHost, Component) -> end). list_formatted(VHost) -> - [pset(value, format(pget(value, P)), P) || P <- list(VHost)]. + [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)]. list_formatted(VHost, Ref, AggregatorPid) -> rabbit_control_misc:emitting_map( AggregatorPid, Ref, - fun(P) -> pset(value, format(pget(value, P)), P) end, list(VHost)). + fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)). lookup(VHost, Component, Name) -> case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of @@ -303,10 +304,6 @@ lookup_component(Component) -> {ok, Module} -> {ok, Module} end. -format(Term) -> - {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)), - list_to_binary(JSON). - flatten_errors(L) -> case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of [] -> ok; diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl index 1bb19b23da..c8946e179d 100644 --- a/src/rabbit_table.erl +++ b/src/rabbit_table.erl @@ -16,24 +16,25 @@ -module(rabbit_table). --export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1, +-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1, force_load/0, is_present/0, is_empty/0, needs_default_data/0, - check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]). + check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0]). -include("rabbit.hrl"). %%---------------------------------------------------------------------------- +-type retry() :: boolean(). -spec create() -> 'ok'. -spec create_local_copy('disc' | 'ram') -> 'ok'. --spec wait_for_replicated() -> 'ok'. +-spec wait_for_replicated(retry()) -> 'ok'. -spec wait([atom()]) -> 'ok'. --spec wait_timeout() -> non_neg_integer() | infinity. +-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}. -spec force_load() -> 'ok'. -spec is_present() -> boolean(). -spec is_empty() -> boolean(). -spec needs_default_data() -> boolean(). --spec check_schema_integrity() -> rabbit_types:ok_or_error(any()). +-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()). -spec clear_ram_only_tables() -> 'ok'. %%---------------------------------------------------------------------------- @@ -75,25 +76,53 @@ create_local_copy(ram) -> create_local_copies(ram), create_local_copy(schema, ram_copies). -wait_for_replicated() -> +wait_for_replicated(Retry) -> wait([Tab || {Tab, TabDef} <- definitions(), - not lists:member({local_content, true}, TabDef)]). + not lists:member({local_content, true}, TabDef)], Retry). wait(TableNames) -> + wait(TableNames, _Retry = false). + +wait(TableNames, Retry) -> + {Timeout, Retries} = retry_timeout(Retry), + wait(TableNames, Timeout, Retries). + +wait(TableNames, Timeout, Retries) -> %% We might be in ctl here for offline ops, in which case we can't %% get_env() for the rabbit app. - Timeout = wait_timeout(), - case mnesia:wait_for_tables(TableNames, Timeout) of - ok -> + rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left~n", + [Timeout, Retries - 1]), + Result = case mnesia:wait_for_tables(TableNames, Timeout) of + ok -> + ok; + {timeout, BadTabs} -> + {error, {timeout_waiting_for_tables, BadTabs}}; + {error, Reason} -> + {error, {failed_waiting_for_tables, Reason}} + end, + case {Retries, Result} of + {_, ok} -> ok; - {timeout, BadTabs} -> - throw({error, {timeout_waiting_for_tables, BadTabs}}); - {error, Reason} -> - throw({error, {failed_waiting_for_tables, Reason}}) + {1, {error, _} = Error} -> + throw(Error); + {_, {error, Error}} -> + rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]), + wait(TableNames, Timeout, Retries - 1); + _ -> + wait(TableNames, Timeout, Retries - 1) end. -wait_timeout() -> - case application:get_env(rabbit, mnesia_table_loading_timeout) of +retry_timeout(_Retry = false) -> + {retry_timeout(), 1}; +retry_timeout(_Retry = true) -> + Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of + {ok, T} -> T; + undefined -> 10 + end, + {retry_timeout(), Retries}. + +retry_timeout() -> + case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of {ok, T} -> T; undefined -> 30000 end. @@ -110,7 +139,7 @@ is_empty(Names) -> lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end, Names). -check_schema_integrity() -> +check_schema_integrity(Retry) -> Tables = mnesia:system_info(tables), case check(fun (Tab, TabDef) -> case lists:member(Tab, Tables) of @@ -118,7 +147,7 @@ check_schema_integrity() -> true -> check_attributes(Tab, TabDef) end end) of - ok -> ok = wait(names()), + ok -> wait(names(), Retry), check(fun check_content/2); Other -> Other end. diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 961246058e..95604d08f4 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -682,25 +682,28 @@ ack([], State) -> %% optimisation: this head is essentially a partial evaluation of the %% general case below, for the single-ack case. ack([SeqId], State) -> - {#msg_status { msg_id = MsgId, - is_persistent = IsPersistent, - msg_in_store = MsgInStore, - index_on_disk = IndexOnDisk }, - State1 = #vqstate { index_state = IndexState, - msg_store_clients = MSCState, - ack_out_counter = AckOutCount }} = - remove_pending_ack(true, SeqId, State), - IndexState1 = case IndexOnDisk of - true -> rabbit_queue_index:ack([SeqId], IndexState); - false -> IndexState - end, - case MsgInStore of - true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); - false -> ok - end, - {[MsgId], - a(State1 #vqstate { index_state = IndexState1, - ack_out_counter = AckOutCount + 1 })}; + case remove_pending_ack(true, SeqId, State) of + {none, _} -> + State; + {#msg_status { msg_id = MsgId, + is_persistent = IsPersistent, + msg_in_store = MsgInStore, + index_on_disk = IndexOnDisk }, + State1 = #vqstate { index_state = IndexState, + msg_store_clients = MSCState, + ack_out_counter = AckOutCount }} -> + IndexState1 = case IndexOnDisk of + true -> rabbit_queue_index:ack([SeqId], IndexState); + false -> IndexState + end, + case MsgInStore of + true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]); + false -> ok + end, + {[MsgId], + a(State1 #vqstate { index_state = IndexState1, + ack_out_counter = AckOutCount + 1 })} + end; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds}, State1 = #vqstate { index_state = IndexState, @@ -708,8 +711,12 @@ ack(AckTags, State) -> ack_out_counter = AckOutCount }} = lists:foldl( fun (SeqId, {Acc, State2}) -> - {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2), - {accumulate_ack(MsgStatus, Acc), State3} + case remove_pending_ack(true, SeqId, State2) of + {none, _} -> + {Acc, State2}; + {MsgStatus, State3} -> + {accumulate_ack(MsgStatus, Acc), State3} + end end, {accumulate_ack_init(), State}, AckTags), IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState), remove_msgs_by_id(MsgIdsByStore, MSCState), @@ -2030,8 +2037,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA, %% First parameter = UpdateStats remove_pending_ack(true, SeqId, State) -> - {MsgStatus, State1} = remove_pending_ack(false, SeqId, State), - {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}; + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {MsgStatus, State1} -> + {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)} + end; remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, disk_pending_ack = DPA, qi_pending_ack = QPA}) -> @@ -2043,9 +2054,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, DPA1 = gb_trees:delete(SeqId, DPA), {V, State#vqstate{disk_pending_ack = DPA1}}; none -> - QPA1 = gb_trees:delete(SeqId, QPA), - {gb_trees:get(SeqId, QPA), - State#vqstate{qi_pending_ack = QPA1}} + case gb_trees:lookup(SeqId, QPA) of + {value, V} -> + QPA1 = gb_trees:delete(SeqId, QPA), + {V, State#vqstate{qi_pending_ack = QPA1}}; + none -> + {none, State} + end end end. @@ -2196,11 +2211,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds, Limit, PubFun, State); {_, _Q1} -> %% enqueue from the remaining list of sequence ids - {MsgStatus, State1} = msg_from_pending_ack(SeqId, State), - {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = - PubFun(MsgStatus, State1), - queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], - Limit, PubFun, State2) + case msg_from_pending_ack(SeqId, State) of + {none, _} -> + queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State); + {MsgStatus, State1} -> + {#msg_status { msg_id = MsgId } = MsgStatus1, State2} = + PubFun(MsgStatus, State1), + queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds], + Limit, PubFun, State2) + end end; queue_merge(SeqIds, Q, Front, MsgIds, _Limit, _PubFun, State) -> @@ -2209,22 +2228,28 @@ queue_merge(SeqIds, Q, Front, MsgIds, delta_merge([], Delta, MsgIds, State) -> {Delta, MsgIds, State}; delta_merge(SeqIds, Delta, MsgIds, State) -> - lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) -> - {#msg_status { msg_id = MsgId } = MsgStatus, State1} = - msg_from_pending_ack(SeqId, State0), - {_MsgStatus, State2} = - maybe_prepare_write_to_disk(true, true, MsgStatus, State1), - {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], - stats({1, -1}, {MsgStatus, none}, State2)} + lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) -> + case msg_from_pending_ack(SeqId, State0) of + {none, _} -> + Acc; + {#msg_status { msg_id = MsgId } = MsgStatus, State1} -> + {_MsgStatus, State2} = + maybe_prepare_write_to_disk(true, true, MsgStatus, State1), + {expand_delta(SeqId, Delta0), [MsgId | MsgIds0], + stats({1, -1}, {MsgStatus, none}, State2)} + end end, {Delta, MsgIds, State}, SeqIds). %% Mostly opposite of record_pending_ack/2 msg_from_pending_ack(SeqId, State) -> - {#msg_status { msg_props = MsgProps } = MsgStatus, State1} = - remove_pending_ack(false, SeqId, State), - {MsgStatus #msg_status { - msg_props = MsgProps #message_properties { needs_confirming = false } }, - State1}. + case remove_pending_ack(false, SeqId, State) of + {none, _} -> + {none, State}; + {#msg_status { msg_props = MsgProps } = MsgStatus, State1} -> + {MsgStatus #msg_status { + msg_props = MsgProps #message_properties { needs_confirming = false } }, + State1} + end. beta_limit(Q) -> case ?QUEUE:peek(Q) of diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl index b933c31402..287488b28b 100644 --- a/src/rabbit_vhost_limit.erl +++ b/src/rabbit_vhost_limit.erl @@ -124,9 +124,10 @@ is_over_queue_limit(VirtualHost) -> %%---------------------------------------------------------------------------- parse_set(VHost, Defn) -> - case rabbit_misc:json_decode(Defn) of - {ok, JSON} -> - set(VHost, rabbit_misc:json_to_term(JSON)); + Definition = rabbit_data_coercion:to_binary(Defn), + case rabbit_json:try_decode(Definition) of + {ok, Term} -> + set(VHost, maps:to_list(Term)); error -> {error_string, "JSON decoding error"} end. diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl index 1e3e65abc4..7a6e290490 100644 --- a/src/rabbit_vm.erl +++ b/src/rabbit_vm.erl @@ -18,8 +18,7 @@ -export([memory/0, binary/0, ets_tables_memory/1]). --define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs", - "rfc4627_jsonrpc"]). +-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]). %%---------------------------------------------------------------------------- |
