summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/gm.erl19
-rw-r--r--src/rabbit.app.src5
-rw-r--r--src/rabbit.erl6
-rw-r--r--src/rabbit_amqqueue_process.erl8
-rw-r--r--src/rabbit_diagnostics.erl1
-rw-r--r--src/rabbit_mirror_queue_slave.erl7
-rw-r--r--src/rabbit_mnesia.erl30
-rw-r--r--src/rabbit_mnesia_rename.erl2
-rw-r--r--src/rabbit_policy.erl15
-rw-r--r--src/rabbit_runtime_parameters.erl15
-rw-r--r--src/rabbit_table.erl65
-rw-r--r--src/rabbit_variable_queue.erl111
-rw-r--r--src/rabbit_vhost_limit.erl7
-rw-r--r--src/rabbit_vm.erl3
14 files changed, 181 insertions, 113 deletions
diff --git a/src/gm.erl b/src/gm.erl
index aa4ffcf511..3a1459fea4 100644
--- a/src/gm.erl
+++ b/src/gm.erl
@@ -1175,11 +1175,20 @@ record_new_member_in_group(NewMember, Left, GroupName, TxnFun) ->
try
Group = #gm_group { members = Members, version = Ver } =
check_membership(Left, read_group(GroupName)),
- {Prefix, [Left | Suffix]} =
- lists:splitwith(fun (M) -> M =/= Left end, Members),
- write_group(Group #gm_group {
- members = Prefix ++ [Left, NewMember | Suffix],
- version = Ver + 1 })
+ case lists:member(NewMember, Members) of
+ true ->
+ %% This avois duplicates during partial partitions,
+ %% as inconsistent views might happen during them
+ rabbit_log:warning("(~p) GM avoiding duplicate of ~p",
+ [self(), NewMember]),
+ Group;
+ false ->
+ {Prefix, [Left | Suffix]} =
+ lists:splitwith(fun (M) -> M =/= Left end, Members),
+ write_group(Group #gm_group {
+ members = Prefix ++ [Left, NewMember | Suffix],
+ version = Ver + 1 })
+ end
catch
lost_membership ->
%% The transaction must not be abruptly crashed, but
diff --git a/src/rabbit.app.src b/src/rabbit.app.src
index c06f7630fa..250b14bc32 100644
--- a/src/rabbit.app.src
+++ b/src/rabbit.app.src
@@ -12,7 +12,7 @@
rabbit_direct_client_sup]},
%% FIXME: Remove goldrush, once rabbit_plugins.erl knows how to ignore
%% indirect dependencies of rabbit.
- {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl]},
+ {applications, [kernel, stdlib, sasl, mnesia, goldrush, lager, rabbit_common, ranch, os_mon, xmerl, jsx]},
%% we also depend on crypto, public_key and ssl but they shouldn't be
%% in here as we don't actually want to start it
{mod, {rabbit, []}},
@@ -47,7 +47,8 @@
{server_properties, []},
{collect_statistics, none},
{collect_statistics_interval, 5000},
- {mnesia_table_loading_timeout, 30000},
+ {mnesia_table_loading_retry_timeout, 30000},
+ {mnesia_table_loading_retry_limit, 10},
{auth_mechanisms, ['PLAIN', 'AMQPLAIN']},
{auth_backends, [rabbit_auth_backend_internal]},
{delegate_count, 16},
diff --git a/src/rabbit.erl b/src/rabbit.erl
index 2581fd49f6..6dde2aca88 100644
--- a/src/rabbit.erl
+++ b/src/rabbit.erl
@@ -19,7 +19,7 @@
-behaviour(application).
-export([start/0, boot/0, stop/0,
- stop_and_halt/0, await_startup/0, status/0, is_running/0,
+ stop_and_halt/0, await_startup/0, status/0, is_running/0, alarms/0,
is_running/1, environment/0, rotate_logs/0, force_event_refresh/1,
start_fhc/0]).
-export([start/2, stop/1, prep_stop/1]).
@@ -1069,9 +1069,9 @@ ensure_working_fhc() ->
end,
TestPid = spawn_link(TestFun),
%% Because we are waiting for the test fun, abuse the
- %% 'mnesia_table_loading_timeout' parameter to find a sane timeout
+ %% 'mnesia_table_loading_retry_timeout' parameter to find a sane timeout
%% value.
- Timeout = rabbit_table:wait_timeout(),
+ Timeout = rabbit_table:retry_timeout(),
receive
fhc_ok -> ok;
{'EXIT', TestPid, Exception} -> throw({ensure_working_fhc, Exception})
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index 3e6d961d5f..25555156d6 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -1334,7 +1334,13 @@ handle_cast(policy_changed, State = #q{q = #amqqueue{name = Name}}) ->
%% This also has the side effect of waking us up so we emit a
%% stats event - so event consumers see the changed policy.
{ok, Q} = rabbit_amqqueue:lookup(Name),
- noreply(process_args_policy(State#q{q = Q})).
+ noreply(process_args_policy(State#q{q = Q}));
+
+handle_cast({sync_start, _, _}, State = #q{q = #amqqueue{name = Name}}) ->
+ %% Only a slave should receive this, it means we are a duplicated master
+ rabbit_mirror_queue_misc:log_warning(
+ Name, "Stopping after receiving sync_start from another master", []),
+ stop(State).
handle_info({maybe_expire, Vsn}, State = #q{args_policy_version = Vsn}) ->
case is_unused(State) of
diff --git a/src/rabbit_diagnostics.erl b/src/rabbit_diagnostics.erl
index d28bb9ffd7..e5df1d5baf 100644
--- a/src/rabbit_diagnostics.erl
+++ b/src/rabbit_diagnostics.erl
@@ -64,7 +64,6 @@ maybe_stuck_stacktrace({prim_inet, accept0, _}) -> false;
maybe_stuck_stacktrace({prim_inet, recv0, _}) -> false;
maybe_stuck_stacktrace({rabbit_heartbeat, heartbeater, _}) -> false;
maybe_stuck_stacktrace({rabbit_net, recv, _}) -> false;
-maybe_stuck_stacktrace({mochiweb_http, request, _}) -> false;
maybe_stuck_stacktrace({group, _, _}) -> false;
maybe_stuck_stacktrace({shell, _, _}) -> false;
maybe_stuck_stacktrace({io, _, _}) -> false;
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 29ba21d374..61623c9441 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -336,7 +336,12 @@ handle_cast({set_ram_duration_target, Duration},
State = #state { backing_queue = BQ,
backing_queue_state = BQS }) ->
BQS1 = BQ:set_ram_duration_target(Duration, BQS),
- noreply(State #state { backing_queue_state = BQS1 }).
+ noreply(State #state { backing_queue_state = BQS1 });
+
+handle_cast(policy_changed, State) ->
+ %% During partial partitions, we might end up receiving messages expected by a master
+ %% Ignore them
+ noreply(State).
handle_info(update_ram_duration, State = #state{backing_queue = BQ,
backing_queue_state = BQS}) ->
diff --git a/src/rabbit_mnesia.erl b/src/rabbit_mnesia.erl
index d66f5b8fab..1ec9a46880 100644
--- a/src/rabbit_mnesia.erl
+++ b/src/rabbit_mnesia.erl
@@ -107,7 +107,7 @@ init() ->
false ->
NodeType = node_type(),
init_db_and_upgrade(cluster_nodes(all), NodeType,
- NodeType =:= ram)
+ NodeType =:= ram, _Retry = true)
end,
%% We intuitively expect the global name server to be synced when
%% Mnesia is up. In fact that's not guaranteed to be the case -
@@ -141,7 +141,7 @@ init_from_config() ->
e(invalid_cluster_nodes_conf)
end,
case DiscoveredNodes of
- [] -> init_db_and_upgrade([node()], disc, false);
+ [] -> init_db_and_upgrade([node()], disc, false, _Retry = true);
_ ->
rabbit_log:info("Discovered peer nodes: ~s~n",
[rabbit_peer_discovery:format_discovered_nodes(DiscoveredNodes)]),
@@ -153,14 +153,14 @@ auto_cluster(TryNodes, NodeType) ->
{ok, Node} ->
rabbit_log:info("Node '~p' selected for auto-clustering~n", [Node]),
{ok, {_, DiscNodes, _}} = discover_cluster0(Node),
- init_db_and_upgrade(DiscNodes, NodeType, true),
+ init_db_and_upgrade(DiscNodes, NodeType, true, _Retry = true),
rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster();
none ->
rabbit_log:warning(
"Could not find any node for auto-clustering from: ~p~n"
"Starting blank node...~n", [TryNodes]),
- init_db_and_upgrade([node()], disc, false)
+ init_db_and_upgrade([node()], disc, false, _Retry = true)
end.
%% Make the node join a cluster. The node will be reset automatically
@@ -200,7 +200,7 @@ join_cluster(DiscoveryNode, NodeType) ->
rabbit_log:info("Clustering with ~p as ~p node~n",
[ClusterNodes, NodeType]),
ok = init_db_with_mnesia(ClusterNodes, NodeType,
- true, true),
+ true, true, _Retry = true),
rabbit_connection_tracking:boot(),
rabbit_node_monitor:notify_joined_cluster(),
ok;
@@ -240,7 +240,7 @@ reset_gracefully() ->
%% need to check for consistency because we are resetting.
%% Force=true here so that reset still works when clustered with a
%% node which is down.
- init_db_with_mnesia(AllNodes, node_type(), false, false),
+ init_db_with_mnesia(AllNodes, node_type(), false, false, _Retry = false),
case is_only_clustered_disc_node() of
true -> e(resetting_only_disc_node);
false -> ok
@@ -289,7 +289,7 @@ update_cluster_nodes(DiscoveryNode) ->
rabbit_node_monitor:write_cluster_status(Status),
rabbit_log:info("Updating cluster nodes from ~p~n",
[DiscoveryNode]),
- init_db_with_mnesia(AllNodes, node_type(), true, true);
+ init_db_with_mnesia(AllNodes, node_type(), true, true, _Retry = false);
false ->
e(inconsistent_cluster)
end,
@@ -339,7 +339,7 @@ remove_node_offline_node(Node) ->
%% is by force loading the table, and making sure that
%% they are loaded.
rabbit_table:force_load(),
- rabbit_table:wait_for_replicated(),
+ rabbit_table:wait_for_replicated(_Retry = false),
%% We skip the 'node_deleted' event because the
%% application is stopped and thus, rabbit_event is not
%% enabled.
@@ -487,7 +487,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
{[_ | _], _, _} ->
%% Subsequent node in cluster, catch up
maybe_force_load(),
- ok = rabbit_table:wait_for_replicated(),
+ ok = rabbit_table:wait_for_replicated(_Retry = true),
ok = rabbit_table:create_local_copy(NodeType)
end,
ensure_schema_integrity(),
@@ -497,7 +497,7 @@ init_db(ClusterNodes, NodeType, CheckOtherNodes) ->
init_db_unchecked(ClusterNodes, NodeType) ->
init_db(ClusterNodes, NodeType, false).
-init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
+init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry) ->
ok = init_db(ClusterNodes, NodeType, CheckOtherNodes),
ok = case rabbit_upgrade:maybe_upgrade_local() of
ok -> ok;
@@ -512,14 +512,14 @@ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes) ->
disc -> ok
end,
%% ...and all nodes will need to wait for tables
- rabbit_table:wait_for_replicated(),
+ rabbit_table:wait_for_replicated(Retry),
ok.
init_db_with_mnesia(ClusterNodes, NodeType,
- CheckOtherNodes, CheckConsistency) ->
+ CheckOtherNodes, CheckConsistency, Retry) ->
start_mnesia(CheckConsistency),
try
- init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes)
+ init_db_and_upgrade(ClusterNodes, NodeType, CheckOtherNodes, Retry)
after
stop_mnesia()
end.
@@ -556,7 +556,7 @@ ensure_mnesia_not_running() ->
end.
ensure_schema_integrity() ->
- case rabbit_table:check_schema_integrity() of
+ case rabbit_table:check_schema_integrity(_Retry = true) of
ok ->
ok;
{error, Reason} ->
@@ -687,7 +687,7 @@ discover_cluster0(Node) ->
rpc:call(Node, rabbit_mnesia, cluster_status_from_mnesia, []).
schema_ok_or_move() ->
- case rabbit_table:check_schema_integrity() of
+ case rabbit_table:check_schema_integrity(_Retry = false) of
ok ->
ok;
{error, Reason} ->
diff --git a/src/rabbit_mnesia_rename.erl b/src/rabbit_mnesia_rename.erl
index 0c3e7c2366..2d7e0f56b6 100644
--- a/src/rabbit_mnesia_rename.erl
+++ b/src/rabbit_mnesia_rename.erl
@@ -193,7 +193,7 @@ delete_rename_files() -> ok = rabbit_file:recursive_delete([dir()]).
start_mnesia() -> rabbit_misc:ensure_ok(mnesia:start(), cannot_start_mnesia),
rabbit_table:force_load(),
- rabbit_table:wait_for_replicated().
+ rabbit_table:wait_for_replicated(_Retry = false).
stop_mnesia() -> stopped = mnesia:stop().
convert_backup(NodeMap, FromBackup, ToBackup) ->
diff --git a/src/rabbit_policy.erl b/src/rabbit_policy.erl
index 7e39164882..cfbf116cbd 100644
--- a/src/rabbit_policy.erl
+++ b/src/rabbit_policy.erl
@@ -212,11 +212,12 @@ parse_set(Type, VHost, Name, Pattern, Definition, Priority, ApplyTo) ->
end.
parse_set0(Type, VHost, Name, Pattern, Defn, Priority, ApplyTo) ->
- case rabbit_misc:json_decode(Defn) of
- {ok, JSON} ->
+ Definition = rabbit_data_coercion:to_binary(Defn),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} ->
set0(Type, VHost, Name,
[{<<"pattern">>, list_to_binary(Pattern)},
- {<<"definition">>, rabbit_misc:json_to_term(JSON)},
+ {<<"definition">>, maps:to_list(Term)},
{<<"priority">>, Priority},
{<<"apply-to">>, ApplyTo}]);
error ->
@@ -270,7 +271,7 @@ list_op(VHost) ->
list0_op(VHost, fun ident/1).
list_formatted_op(VHost) ->
- order_policies(list0_op(VHost, fun format/1)).
+ order_policies(list0_op(VHost, fun rabbit_json:encode/1)).
list_formatted_op(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(AggregatorPid, Ref,
@@ -288,7 +289,7 @@ list(VHost) ->
list0(VHost, fun ident/1).
list_formatted(VHost) ->
- order_policies(list0(VHost, fun format/1)).
+ order_policies(list0(VHost, fun rabbit_json:encode/1)).
list_formatted(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(AggregatorPid, Ref,
@@ -309,10 +310,6 @@ p(Parameter, DefnFun) ->
{definition, DefnFun(pget(<<"definition">>, Value))},
{priority, pget(<<"priority">>, Value)}].
-format(Term) ->
- {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
- list_to_binary(JSON).
-
ident(X) -> X.
info_keys() -> [vhost, name, 'apply-to', pattern, definition, priority].
diff --git a/src/rabbit_runtime_parameters.erl b/src/rabbit_runtime_parameters.erl
index 97f78da8ba..072a48be3d 100644
--- a/src/rabbit_runtime_parameters.erl
+++ b/src/rabbit_runtime_parameters.erl
@@ -97,9 +97,10 @@
parse_set(_, <<"policy">>, _, _, _) ->
{error_string, "policies may not be set using this method"};
parse_set(VHost, Component, Name, String, User) ->
- case rabbit_misc:json_decode(String) of
- {ok, JSON} -> set(VHost, Component, Name,
- rabbit_misc:json_to_term(JSON), User);
+ Definition = rabbit_data_coercion:to_binary(String),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} when is_map(Term) -> set(VHost, Component, Name, maps:to_list(Term), User);
+ {ok, Term} -> set(VHost, Component, Name, Term, User);
error -> {error_string, "JSON decoding error"}
end.
@@ -235,12 +236,12 @@ list(VHost, Component) ->
end).
list_formatted(VHost) ->
- [pset(value, format(pget(value, P)), P) || P <- list(VHost)].
+ [pset(value, rabbit_json:encode(pget(value, P)), P) || P <- list(VHost)].
list_formatted(VHost, Ref, AggregatorPid) ->
rabbit_control_misc:emitting_map(
AggregatorPid, Ref,
- fun(P) -> pset(value, format(pget(value, P)), P) end, list(VHost)).
+ fun(P) -> pset(value, rabbit_json:encode(pget(value, P)), P) end, list(VHost)).
lookup(VHost, Component, Name) ->
case lookup0({VHost, Component, Name}, rabbit_misc:const(not_found)) of
@@ -303,10 +304,6 @@ lookup_component(Component) ->
{ok, Module} -> {ok, Module}
end.
-format(Term) ->
- {ok, JSON} = rabbit_misc:json_encode(rabbit_misc:term_to_json(Term)),
- list_to_binary(JSON).
-
flatten_errors(L) ->
case [{F, A} || I <- lists:flatten([L]), {error, F, A} <- [I]] of
[] -> ok;
diff --git a/src/rabbit_table.erl b/src/rabbit_table.erl
index 1bb19b23da..c8946e179d 100644
--- a/src/rabbit_table.erl
+++ b/src/rabbit_table.erl
@@ -16,24 +16,25 @@
-module(rabbit_table).
--export([create/0, create_local_copy/1, wait_for_replicated/0, wait/1,
+-export([create/0, create_local_copy/1, wait_for_replicated/1, wait/1,
force_load/0, is_present/0, is_empty/0, needs_default_data/0,
- check_schema_integrity/0, clear_ram_only_tables/0, wait_timeout/0]).
+ check_schema_integrity/1, clear_ram_only_tables/0, retry_timeout/0]).
-include("rabbit.hrl").
%%----------------------------------------------------------------------------
+-type retry() :: boolean().
-spec create() -> 'ok'.
-spec create_local_copy('disc' | 'ram') -> 'ok'.
--spec wait_for_replicated() -> 'ok'.
+-spec wait_for_replicated(retry()) -> 'ok'.
-spec wait([atom()]) -> 'ok'.
--spec wait_timeout() -> non_neg_integer() | infinity.
+-spec retry_timeout() -> {non_neg_integer() | infinity, non_neg_integer()}.
-spec force_load() -> 'ok'.
-spec is_present() -> boolean().
-spec is_empty() -> boolean().
-spec needs_default_data() -> boolean().
--spec check_schema_integrity() -> rabbit_types:ok_or_error(any()).
+-spec check_schema_integrity(retry()) -> rabbit_types:ok_or_error(any()).
-spec clear_ram_only_tables() -> 'ok'.
%%----------------------------------------------------------------------------
@@ -75,25 +76,53 @@ create_local_copy(ram) ->
create_local_copies(ram),
create_local_copy(schema, ram_copies).
-wait_for_replicated() ->
+wait_for_replicated(Retry) ->
wait([Tab || {Tab, TabDef} <- definitions(),
- not lists:member({local_content, true}, TabDef)]).
+ not lists:member({local_content, true}, TabDef)], Retry).
wait(TableNames) ->
+ wait(TableNames, _Retry = false).
+
+wait(TableNames, Retry) ->
+ {Timeout, Retries} = retry_timeout(Retry),
+ wait(TableNames, Timeout, Retries).
+
+wait(TableNames, Timeout, Retries) ->
%% We might be in ctl here for offline ops, in which case we can't
%% get_env() for the rabbit app.
- Timeout = wait_timeout(),
- case mnesia:wait_for_tables(TableNames, Timeout) of
- ok ->
+ rabbit_log:info("Waiting for Mnesia tables for ~p ms, ~p retries left~n",
+ [Timeout, Retries - 1]),
+ Result = case mnesia:wait_for_tables(TableNames, Timeout) of
+ ok ->
+ ok;
+ {timeout, BadTabs} ->
+ {error, {timeout_waiting_for_tables, BadTabs}};
+ {error, Reason} ->
+ {error, {failed_waiting_for_tables, Reason}}
+ end,
+ case {Retries, Result} of
+ {_, ok} ->
ok;
- {timeout, BadTabs} ->
- throw({error, {timeout_waiting_for_tables, BadTabs}});
- {error, Reason} ->
- throw({error, {failed_waiting_for_tables, Reason}})
+ {1, {error, _} = Error} ->
+ throw(Error);
+ {_, {error, Error}} ->
+ rabbit_log:warning("Error while waiting for Mnesia tables: ~p~n", [Error]),
+ wait(TableNames, Timeout, Retries - 1);
+ _ ->
+ wait(TableNames, Timeout, Retries - 1)
end.
-wait_timeout() ->
- case application:get_env(rabbit, mnesia_table_loading_timeout) of
+retry_timeout(_Retry = false) ->
+ {retry_timeout(), 1};
+retry_timeout(_Retry = true) ->
+ Retries = case application:get_env(rabbit, mnesia_table_loading_retry_limit) of
+ {ok, T} -> T;
+ undefined -> 10
+ end,
+ {retry_timeout(), Retries}.
+
+retry_timeout() ->
+ case application:get_env(rabbit, mnesia_table_loading_retry_timeout) of
{ok, T} -> T;
undefined -> 30000
end.
@@ -110,7 +139,7 @@ is_empty(Names) ->
lists:all(fun (Tab) -> mnesia:dirty_first(Tab) == '$end_of_table' end,
Names).
-check_schema_integrity() ->
+check_schema_integrity(Retry) ->
Tables = mnesia:system_info(tables),
case check(fun (Tab, TabDef) ->
case lists:member(Tab, Tables) of
@@ -118,7 +147,7 @@ check_schema_integrity() ->
true -> check_attributes(Tab, TabDef)
end
end) of
- ok -> ok = wait(names()),
+ ok -> wait(names(), Retry),
check(fun check_content/2);
Other -> Other
end.
diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl
index 961246058e..95604d08f4 100644
--- a/src/rabbit_variable_queue.erl
+++ b/src/rabbit_variable_queue.erl
@@ -682,25 +682,28 @@ ack([], State) ->
%% optimisation: this head is essentially a partial evaluation of the
%% general case below, for the single-ack case.
ack([SeqId], State) ->
- {#msg_status { msg_id = MsgId,
- is_persistent = IsPersistent,
- msg_in_store = MsgInStore,
- index_on_disk = IndexOnDisk },
- State1 = #vqstate { index_state = IndexState,
- msg_store_clients = MSCState,
- ack_out_counter = AckOutCount }} =
- remove_pending_ack(true, SeqId, State),
- IndexState1 = case IndexOnDisk of
- true -> rabbit_queue_index:ack([SeqId], IndexState);
- false -> IndexState
- end,
- case MsgInStore of
- true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
- false -> ok
- end,
- {[MsgId],
- a(State1 #vqstate { index_state = IndexState1,
- ack_out_counter = AckOutCount + 1 })};
+ case remove_pending_ack(true, SeqId, State) of
+ {none, _} ->
+ State;
+ {#msg_status { msg_id = MsgId,
+ is_persistent = IsPersistent,
+ msg_in_store = MsgInStore,
+ index_on_disk = IndexOnDisk },
+ State1 = #vqstate { index_state = IndexState,
+ msg_store_clients = MSCState,
+ ack_out_counter = AckOutCount }} ->
+ IndexState1 = case IndexOnDisk of
+ true -> rabbit_queue_index:ack([SeqId], IndexState);
+ false -> IndexState
+ end,
+ case MsgInStore of
+ true -> ok = msg_store_remove(MSCState, IsPersistent, [MsgId]);
+ false -> ok
+ end,
+ {[MsgId],
+ a(State1 #vqstate { index_state = IndexState1,
+ ack_out_counter = AckOutCount + 1 })}
+ end;
ack(AckTags, State) ->
{{IndexOnDiskSeqIds, MsgIdsByStore, AllMsgIds},
State1 = #vqstate { index_state = IndexState,
@@ -708,8 +711,12 @@ ack(AckTags, State) ->
ack_out_counter = AckOutCount }} =
lists:foldl(
fun (SeqId, {Acc, State2}) ->
- {MsgStatus, State3} = remove_pending_ack(true, SeqId, State2),
- {accumulate_ack(MsgStatus, Acc), State3}
+ case remove_pending_ack(true, SeqId, State2) of
+ {none, _} ->
+ {Acc, State2};
+ {MsgStatus, State3} ->
+ {accumulate_ack(MsgStatus, Acc), State3}
+ end
end, {accumulate_ack_init(), State}, AckTags),
IndexState1 = rabbit_queue_index:ack(IndexOnDiskSeqIds, IndexState),
remove_msgs_by_id(MsgIdsByStore, MSCState),
@@ -2030,8 +2037,12 @@ lookup_pending_ack(SeqId, #vqstate { ram_pending_ack = RPA,
%% First parameter = UpdateStats
remove_pending_ack(true, SeqId, State) ->
- {MsgStatus, State1} = remove_pending_ack(false, SeqId, State),
- {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)};
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {MsgStatus, State1} ->
+ {MsgStatus, stats({0, -1}, {MsgStatus, none}, State1)}
+ end;
remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
disk_pending_ack = DPA,
qi_pending_ack = QPA}) ->
@@ -2043,9 +2054,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA,
DPA1 = gb_trees:delete(SeqId, DPA),
{V, State#vqstate{disk_pending_ack = DPA1}};
none ->
- QPA1 = gb_trees:delete(SeqId, QPA),
- {gb_trees:get(SeqId, QPA),
- State#vqstate{qi_pending_ack = QPA1}}
+ case gb_trees:lookup(SeqId, QPA) of
+ {value, V} ->
+ QPA1 = gb_trees:delete(SeqId, QPA),
+ {V, State#vqstate{qi_pending_ack = QPA1}};
+ none ->
+ {none, State}
+ end
end
end.
@@ -2196,11 +2211,15 @@ queue_merge([SeqId | Rest] = SeqIds, Q, Front, MsgIds,
Limit, PubFun, State);
{_, _Q1} ->
%% enqueue from the remaining list of sequence ids
- {MsgStatus, State1} = msg_from_pending_ack(SeqId, State),
- {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
- PubFun(MsgStatus, State1),
- queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
- Limit, PubFun, State2)
+ case msg_from_pending_ack(SeqId, State) of
+ {none, _} ->
+ queue_merge(Rest, Q, Front, MsgIds, Limit, PubFun, State);
+ {MsgStatus, State1} ->
+ {#msg_status { msg_id = MsgId } = MsgStatus1, State2} =
+ PubFun(MsgStatus, State1),
+ queue_merge(Rest, Q, ?QUEUE:in(MsgStatus1, Front), [MsgId | MsgIds],
+ Limit, PubFun, State2)
+ end
end;
queue_merge(SeqIds, Q, Front, MsgIds,
_Limit, _PubFun, State) ->
@@ -2209,22 +2228,28 @@ queue_merge(SeqIds, Q, Front, MsgIds,
delta_merge([], Delta, MsgIds, State) ->
{Delta, MsgIds, State};
delta_merge(SeqIds, Delta, MsgIds, State) ->
- lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0}) ->
- {#msg_status { msg_id = MsgId } = MsgStatus, State1} =
- msg_from_pending_ack(SeqId, State0),
- {_MsgStatus, State2} =
- maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
- {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
- stats({1, -1}, {MsgStatus, none}, State2)}
+ lists:foldl(fun (SeqId, {Delta0, MsgIds0, State0} = Acc) ->
+ case msg_from_pending_ack(SeqId, State0) of
+ {none, _} ->
+ Acc;
+ {#msg_status { msg_id = MsgId } = MsgStatus, State1} ->
+ {_MsgStatus, State2} =
+ maybe_prepare_write_to_disk(true, true, MsgStatus, State1),
+ {expand_delta(SeqId, Delta0), [MsgId | MsgIds0],
+ stats({1, -1}, {MsgStatus, none}, State2)}
+ end
end, {Delta, MsgIds, State}, SeqIds).
%% Mostly opposite of record_pending_ack/2
msg_from_pending_ack(SeqId, State) ->
- {#msg_status { msg_props = MsgProps } = MsgStatus, State1} =
- remove_pending_ack(false, SeqId, State),
- {MsgStatus #msg_status {
- msg_props = MsgProps #message_properties { needs_confirming = false } },
- State1}.
+ case remove_pending_ack(false, SeqId, State) of
+ {none, _} ->
+ {none, State};
+ {#msg_status { msg_props = MsgProps } = MsgStatus, State1} ->
+ {MsgStatus #msg_status {
+ msg_props = MsgProps #message_properties { needs_confirming = false } },
+ State1}
+ end.
beta_limit(Q) ->
case ?QUEUE:peek(Q) of
diff --git a/src/rabbit_vhost_limit.erl b/src/rabbit_vhost_limit.erl
index b933c31402..287488b28b 100644
--- a/src/rabbit_vhost_limit.erl
+++ b/src/rabbit_vhost_limit.erl
@@ -124,9 +124,10 @@ is_over_queue_limit(VirtualHost) ->
%%----------------------------------------------------------------------------
parse_set(VHost, Defn) ->
- case rabbit_misc:json_decode(Defn) of
- {ok, JSON} ->
- set(VHost, rabbit_misc:json_to_term(JSON));
+ Definition = rabbit_data_coercion:to_binary(Defn),
+ case rabbit_json:try_decode(Definition) of
+ {ok, Term} ->
+ set(VHost, maps:to_list(Term));
error ->
{error_string, "JSON decoding error"}
end.
diff --git a/src/rabbit_vm.erl b/src/rabbit_vm.erl
index 1e3e65abc4..7a6e290490 100644
--- a/src/rabbit_vm.erl
+++ b/src/rabbit_vm.erl
@@ -18,8 +18,7 @@
-export([memory/0, binary/0, ets_tables_memory/1]).
--define(MAGIC_PLUGINS, ["mochiweb", "webmachine", "cowboy", "sockjs",
- "rfc4627_jsonrpc"]).
+-define(MAGIC_PLUGINS, ["cowboy", "ranch", "sockjs"]).
%%----------------------------------------------------------------------------