summaryrefslogtreecommitdiff
path: root/deps/rabbitmq_mqtt/src
diff options
context:
space:
mode:
authordcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
committerdcorbacho <dparracorbacho@piotal.io>2020-11-18 14:27:41 +0000
commitf23a51261d9502ec39df0f8db47ba6b22aa7659f (patch)
tree53dcdf46e7dc2c14e81ee960bce8793879b488d3 /deps/rabbitmq_mqtt/src
parentafa2c2bf6c7e0e9b63f4fb53dc931c70388e1c82 (diff)
parent9f6d64ec4a4b1eeac24d7846c5c64fd96798d892 (diff)
downloadrabbitmq-server-git-stream-timestamp-offset.tar.gz
Merge remote-tracking branch 'origin/master' into stream-timestamp-offsetstream-timestamp-offset
Diffstat (limited to 'deps/rabbitmq_mqtt/src')
-rw-r--r--deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl68
-rw-r--r--deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl87
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_machine.erl134
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_node.erl132
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt.erl55
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl88
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl25
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl43
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl224
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl45
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl1054
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl480
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl23
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl54
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl54
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl31
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl98
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl60
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl73
-rw-r--r--deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl139
20 files changed, 2967 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl
new file mode 100644
index 0000000000..f0aefb526b
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl
@@ -0,0 +1,68 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand').
+
+-include("rabbit_mqtt.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([scopes/0,
+ switches/0,
+ aliases/0,
+ usage/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+scopes() -> [ctl].
+switches() -> [].
+aliases() -> [].
+
+description() -> <<"Removes cluster member and permanently deletes its cluster-wide MQTT state">>.
+
+help_section() ->
+ {plugin, mqtt}.
+
+validate([], _Opts) ->
+ {validation_failure, not_enough_args};
+validate([_, _ | _], _Opts) ->
+ {validation_failure, too_many_args};
+validate([_], _) ->
+ ok.
+
+merge_defaults(Args, Opts) ->
+ {Args, Opts}.
+
+usage() ->
+ <<"decommission_mqtt_node <node>">>.
+
+usage_doc_guides() ->
+ [?MQTT_GUIDE_URL].
+
+run([Node], #{node := NodeName,
+ timeout := Timeout}) ->
+ case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of
+ {badrpc, _} = Error ->
+ Error;
+ nodedown ->
+ {ok, list_to_binary(io_lib:format("Node ~s is down but has been successfully removed"
+ " from the cluster", [Node]))};
+ Result ->
+ %% 'ok' or 'timeout'
+ %% TODO: Ra will timeout if the node is not a cluster member - should this be fixed??
+ Result
+ end.
+
+banner([Node], _) -> list_to_binary(io_lib:format("Removing node ~s from the list of MQTT nodes...", [Node])).
+
+output(Result, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl
new file mode 100644
index 0000000000..a5745a7f58
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl
@@ -0,0 +1,87 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+
+-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand').
+
+-include("rabbit_mqtt.hrl").
+
+-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour').
+
+-export([formatter/0,
+ scopes/0,
+ switches/0,
+ aliases/0,
+ usage/0,
+ usage_additional/0,
+ usage_doc_guides/0,
+ banner/2,
+ validate/2,
+ merge_defaults/2,
+ run/2,
+ output/2,
+ description/0,
+ help_section/0]).
+
+formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'.
+scopes() -> [ctl, diagnostics].
+switches() -> [{verbose, boolean}].
+aliases() -> [{'V', verbose}].
+
+description() -> <<"Lists MQTT connections on the target node">>.
+
+help_section() ->
+ {plugin, mqtt}.
+
+validate(Args, _) ->
+ case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args,
+ ?INFO_ITEMS) of
+ {ok, _} -> ok;
+ Error -> Error
+ end.
+
+merge_defaults([], Opts) ->
+ merge_defaults([<<"client_id">>, <<"conn_name">>], Opts);
+merge_defaults(Args, Opts) ->
+ {Args, maps:merge(#{verbose => false}, Opts)}.
+
+usage() ->
+ <<"list_mqtt_connections [<column> ...]">>.
+
+usage_additional() ->
+ Prefix = <<" must be one of ">>,
+ InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>),
+ [
+ {<<"<column>">>, <<Prefix/binary, InfoItems/binary>>}
+ ].
+
+usage_doc_guides() ->
+ [?MQTT_GUIDE_URL].
+
+run(Args, #{node := NodeName,
+ timeout := Timeout,
+ verbose := Verbose}) ->
+ InfoKeys = case Verbose of
+ true -> ?INFO_ITEMS;
+ false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args)
+ end,
+
+ %% a node uses the Raft-based collector to list connections, which knows about all connections in the cluster
+ %% so no need to reach out to all the nodes
+ Nodes = [NodeName],
+
+ 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items(
+ NodeName,
+ rabbit_mqtt,
+ emit_connection_info_all,
+ [Nodes, InfoKeys],
+ Timeout,
+ InfoKeys,
+ length(Nodes)).
+
+banner(_, _) -> <<"Listing MQTT connections ...">>.
+
+output(Result, _Opts) ->
+ 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result).
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
new file mode 100644
index 0000000000..334aa9e32c
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
@@ -0,0 +1,134 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(mqtt_machine).
+-behaviour(ra_machine).
+
+-include("mqtt_machine.hrl").
+
+-export([init/1,
+ apply/3,
+ state_enter/2,
+ notify_connection/2]).
+
+-type state() :: #machine_state{}.
+
+-type config() :: map().
+
+-type reply() :: {ok, term()} | {error, term()}.
+-type client_id() :: term().
+
+-type command() :: {register, client_id(), pid()} |
+ {unregister, client_id(), pid()} |
+ list.
+
+-spec init(config()) -> state().
+init(_Conf) ->
+ #machine_state{}.
+
+-spec apply(map(), command(), state()) ->
+ {state(), reply(), ra_machine:effects()}.
+apply(_Meta, {register, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
+ {Effects, Ids1} =
+ case maps:find(ClientId, Ids) of
+ {ok, OldPid} when Pid =/= OldPid ->
+ Effects0 = [{demonitor, process, OldPid},
+ {monitor, process, Pid},
+ {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}],
+ {Effects0, maps:remove(ClientId, Ids)};
+ _ ->
+ Effects0 = [{monitor, process, Pid}],
+ {Effects0, Ids}
+ end,
+ State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1)},
+ {State, ok, Effects};
+
+apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) ->
+ State = case maps:find(ClientId, Ids) of
+ {ok, Pid} -> State0#machine_state{client_ids = maps:remove(ClientId, Ids)};
+ %% don't delete client id that might belong to a newer connection
+ %% that kicked the one with Pid out
+ {ok, _AnotherPid} -> State0;
+ error -> State0
+ end,
+ Effects0 = [{demonitor, process, Pid}],
+ %% snapshot only when the map has changed
+ Effects = case State of
+ State0 -> Effects0;
+ _ -> Effects0 ++ snapshot_effects(Meta, State)
+ end,
+ {State, ok, Effects};
+
+apply(_Meta, {down, DownPid, noconnection}, State) ->
+ %% Monitor the node the pid is on (see {nodeup, Node} below)
+ %% so that we can detect when the node is re-connected and discover the
+ %% actual fate of the connection processes on it
+ Effect = {monitor, node, node(DownPid)},
+ {State, ok, Effect};
+
+apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) ->
+ Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid ->
+ false;
+ (_, _) ->
+ true
+ end, Ids),
+ State = State0#machine_state{client_ids = Ids1},
+ Delta = maps:keys(Ids) -- maps:keys(Ids1),
+ Effects = lists:map(fun(Id) ->
+ [{mod_call, rabbit_log, debug,
+ ["MQTT connection with client id '~s' failed", [Id]]}] end, Delta),
+ {State, ok, Effects ++ snapshot_effects(Meta, State)};
+
+apply(_Meta, {nodeup, Node}, State) ->
+ %% Work out if any pids that were disconnected are still
+ %% alive.
+ %% Re-request the monitor for the pids on the now-back node.
+ Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node],
+ {State, ok, Effects};
+apply(_Meta, {nodedown, _Node}, State) ->
+ {State, ok};
+
+apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) ->
+ Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids),
+ Delta = maps:keys(Ids) -- maps:keys(Ids1),
+
+ Effects = lists:foldl(fun (ClientId, Acc) ->
+ Pid = maps:get(ClientId, Ids),
+ [
+ {demonitor, process, Pid},
+ {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]},
+ {mod_call, rabbit_log, debug,
+ ["MQTT will remove client ID '~s' from known "
+ "as its node has been decommissioned", [ClientId]]}
+ ] ++ Acc
+ end, [], Delta),
+
+ State = State0#machine_state{client_ids = Ids1},
+ {State, ok, Effects ++ snapshot_effects(Meta, State)};
+
+apply(_Meta, Unknown, State) ->
+ error_logger:error_msg("MQTT Raft state machine received unknown command ~p~n", [Unknown]),
+ {State, {error, {unknown_command, Unknown}}, []}.
+
+state_enter(leader, State) ->
+ %% re-request monitors for all known pids, this would clean up
+ %% records for all connections are no longer around, e.g. right after node restart
+ [{monitor, process, Pid} || Pid <- all_pids(State)];
+state_enter(_, _) ->
+ [].
+
+%% ==========================
+
+%% Avoids blocking the Raft leader.
+notify_connection(Pid, Reason) ->
+ spawn(fun() -> gen_server2:cast(Pid, Reason) end).
+
+-spec snapshot_effects(map(), state()) -> ra_machine:effects().
+snapshot_effects(#{index := RaftIdx}, State) ->
+ [{release_cursor, RaftIdx, State}].
+
+all_pids(#machine_state{client_ids = Ids}) ->
+ maps:values(Ids).
diff --git a/deps/rabbitmq_mqtt/src/mqtt_node.erl b/deps/rabbitmq_mqtt/src/mqtt_node.erl
new file mode 100644
index 0000000000..84dcd9b3a4
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/mqtt_node.erl
@@ -0,0 +1,132 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(mqtt_node).
+
+-export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0]).
+
+-define(ID_NAME, mqtt_node).
+-define(START_TIMEOUT, 100000).
+-define(RETRY_INTERVAL, 5000).
+-define(RA_OPERATION_TIMEOUT, 60000).
+
+node_id() ->
+ server_id(node()).
+
+server_id() ->
+ server_id(node()).
+
+server_id(Node) ->
+ {?ID_NAME, Node}.
+
+all_node_ids() ->
+ [server_id(N) || N <- rabbit_mnesia:cluster_nodes(all),
+ can_participate_in_clientid_tracking(N)].
+
+start() ->
+ %% 3s to 6s randomized
+ Repetitions = rand:uniform(10) + 10,
+ start(300, Repetitions).
+
+start(_Delay, AttemptsLeft) when AttemptsLeft =< 0 ->
+ start_server(),
+ trigger_election();
+start(Delay, AttemptsLeft) ->
+ NodeId = server_id(),
+ Nodes = compatible_peer_servers(),
+ case ra_directory:uid_of(?ID_NAME) of
+ undefined ->
+ case Nodes of
+ [] ->
+ %% Since cluster members are not known ahead of time and initial boot can be happening in parallel,
+ %% we wait and check a few times (up to a few seconds) to see if we can discover any peers to
+ %% join before forming a cluster. This reduces the probability of N independent clusters being
+ %% formed in the common scenario of N nodes booting in parallel e.g. because they were started
+ %% at the same time by a deployment tool.
+ %%
+ %% This scenario does not guarantee single cluster formation but without knowing the list of members
+ %% ahead of time, this is a best effort workaround. Multi-node consensus is apparently hard
+ %% to achieve without having consensus around expected cluster members.
+ rabbit_log:info("MQTT: will wait for ~p more ms for cluster members to join before triggering a Raft leader election", [Delay]),
+ timer:sleep(Delay),
+ start(Delay, AttemptsLeft - 1);
+ Peers ->
+ %% Trigger an election.
+ %% This is required when we start a node for the first time.
+ %% Using default timeout because it supposed to reply fast.
+ rabbit_log:info("MQTT: discovered ~p cluster peers that support client ID tracking", [length(Peers)]),
+ start_server(),
+ join_peers(NodeId, Peers),
+ ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT)
+ end;
+ _ ->
+ join_peers(NodeId, Nodes),
+ ra:restart_server(NodeId),
+ ra:trigger_election(NodeId)
+ end,
+ ok.
+
+compatible_peer_servers() ->
+ all_node_ids() -- [(node_id())].
+
+start_server() ->
+ NodeId = node_id(),
+ Nodes = compatible_peer_servers(),
+ UId = ra:new_uid(ra_lib:to_binary(?ID_NAME)),
+ Timeout = application:get_env(kernel, net_ticktime, 60) + 5,
+ Conf = #{cluster_name => ?ID_NAME,
+ id => NodeId,
+ uid => UId,
+ friendly_name => ?ID_NAME,
+ initial_members => Nodes,
+ log_init_args => #{uid => UId},
+ tick_timeout => Timeout,
+ machine => {module, mqtt_machine, #{}}
+ },
+ ra:start_server(Conf).
+
+trigger_election() ->
+ ra:trigger_election(server_id()).
+
+join_peers(_NodeId, []) ->
+ ok;
+join_peers(NodeId, Nodes) ->
+ join_peers(NodeId, Nodes, 100).
+join_peers(_NodeId, [], _RetriesLeft) ->
+ ok;
+join_peers(_NodeId, _Nodes, RetriesLeft) when RetriesLeft =:= 0 ->
+ rabbit_log:error("MQTT: exhausted all attempts while trying to rejoin cluster peers");
+join_peers(NodeId, Nodes, RetriesLeft) ->
+ case ra:members(Nodes, ?START_TIMEOUT) of
+ {ok, Members, _} ->
+ case lists:member(NodeId, Members) of
+ true -> ok;
+ false -> ra:add_member(Members, NodeId)
+ end;
+ {timeout, _} ->
+ rabbit_log:debug("MQTT: timed out contacting cluster peers, %s retries left", [RetriesLeft]),
+ timer:sleep(?RETRY_INTERVAL),
+ join_peers(NodeId, Nodes, RetriesLeft - 1);
+ Err ->
+ Err
+ end.
+
+-spec leave(node()) -> 'ok' | 'timeout' | 'nodedown'.
+leave(Node) ->
+ NodeId = server_id(),
+ ToLeave = server_id(Node),
+ try
+ ra:leave_and_delete_server(NodeId, ToLeave)
+ catch
+ exit:{{nodedown, Node}, _} ->
+ nodedown
+ end.
+
+can_participate_in_clientid_tracking(Node) ->
+ case rpc:call(Node, mqtt_machine, module_info, []) of
+ {badrpc, _} -> false;
+ _ -> true
+ end.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
new file mode 100644
index 0000000000..192f8a7fee
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl
@@ -0,0 +1,55 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt).
+
+-behaviour(application).
+-export([start/2, stop/1]).
+-export([connection_info_local/1,
+ emit_connection_info_local/3,
+ emit_connection_info_all/4,
+ close_all_client_connections/1]).
+
+start(normal, []) ->
+ {ok, Listeners} = application:get_env(tcp_listeners),
+ {ok, SslListeners} = application:get_env(ssl_listeners),
+ ok = mqtt_node:start(),
+ Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []),
+ EMPid = case rabbit_event:start_link() of
+ {ok, Pid} -> Pid;
+ {error, {already_started, Pid}} -> Pid
+ end,
+ gen_event:add_handler(EMPid, rabbit_mqtt_internal_event_handler, []),
+ Result.
+
+stop(_) ->
+ rabbit_mqtt_sup:stop_listeners().
+
+-spec close_all_client_connections(string() | binary()) -> {'ok', non_neg_integer()}.
+close_all_client_connections(Reason) ->
+ Connections = rabbit_mqtt_collector:list(),
+ [rabbit_mqtt_reader:close_connection(Pid, Reason) || {_, Pid} <- Connections],
+ {ok, length(Connections)}.
+
+emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) ->
+ Pids = [spawn_link(Node, rabbit_mqtt, emit_connection_info_local,
+ [Items, Ref, AggregatorPid])
+ || Node <- Nodes],
+ rabbit_control_misc:await_emitters_termination(Pids),
+ ok.
+
+emit_connection_info_local(Items, Ref, AggregatorPid) ->
+ rabbit_control_misc:emitting_map_with_exit_handler(
+ AggregatorPid, Ref, fun({_, Pid}) ->
+ rabbit_mqtt_reader:info(Pid, Items)
+ end,
+ rabbit_mqtt_collector:list()).
+
+connection_info_local(Items) ->
+ Connections = rabbit_mqtt_collector:list(),
+ [rabbit_mqtt_reader:info(Pid, Items)
+ || {_, Pid} <- Connections].
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
new file mode 100644
index 0000000000..341ee46850
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl
@@ -0,0 +1,88 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_collector).
+
+-include("mqtt_machine.hrl").
+
+-export([register/2, register/3, unregister/2, list/0, leave/1]).
+
+%%----------------------------------------------------------------------------
+-spec register(term(), pid()) -> {ok, reference()} | {error, term()}.
+register(ClientId, Pid) ->
+ {ClusterName, _} = NodeId = mqtt_node:server_id(),
+ case ra_leaderboard:lookup_leader(ClusterName) of
+ undefined ->
+ case ra:members(NodeId) of
+ {ok, _, Leader} ->
+ register(Leader, ClientId, Pid);
+ _ = Error ->
+ Error
+ end;
+ Leader ->
+ register(Leader, ClientId, Pid)
+ end.
+
+-spec register(ra:server_id(), term(), pid()) ->
+ {ok, reference()} | {error, term()}.
+register(ServerId, ClientId, Pid) ->
+ Corr = make_ref(),
+ send_ra_command(ServerId, {register, ClientId, Pid}, Corr),
+ erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}),
+ {ok, Corr}.
+
+unregister(ClientId, Pid) ->
+ {ClusterName, _} = mqtt_node:server_id(),
+ case ra_leaderboard:lookup_leader(ClusterName) of
+ undefined ->
+ ok;
+ Leader ->
+ send_ra_command(Leader, {unregister, ClientId, Pid}, no_correlation)
+ end.
+
+list() ->
+ {ClusterName, _} = mqtt_node:server_id(),
+ QF = fun (#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end,
+ case ra_leaderboard:lookup_leader(ClusterName) of
+ undefined ->
+ NodeIds = mqtt_node:all_node_ids(),
+ case ra:leader_query(NodeIds, QF) of
+ {ok, {_, Ids}, _} -> Ids;
+ {timeout, _} ->
+ rabbit_log:debug("~s:list/0 leader query timed out",
+ [?MODULE]),
+ []
+ end;
+ Leader ->
+ case ra:leader_query(Leader, QF) of
+ {ok, {_, Ids}, _} -> Ids;
+ {error, _} ->
+ [];
+ {timeout, _} ->
+ rabbit_log:debug("~s:list/0 leader query timed out",
+ [?MODULE]),
+ []
+ end
+ end.
+
+leave(NodeBin) ->
+ Node = binary_to_atom(NodeBin, utf8),
+ ServerId = mqtt_node:server_id(),
+ run_ra_command(ServerId, {leave, Node}),
+ mqtt_node:leave(Node).
+
+%%----------------------------------------------------------------------------
+-spec run_ra_command(term(), term()) -> term() | {error, term()}.
+run_ra_command(ServerId, RaCommand) ->
+ case ra:process_command(ServerId, RaCommand) of
+ {ok, Result, _} -> Result;
+ _ = Error -> Error
+ end.
+
+-spec send_ra_command(term(), term(), term()) -> ok.
+send_ra_command(ServerId, RaCommand, Correlation) ->
+ ok = ra:pipeline_command(ServerId, RaCommand, Correlation, normal).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl
new file mode 100644
index 0000000000..4e73a19253
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl
@@ -0,0 +1,25 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2017-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+-module(rabbit_mqtt_connection_info).
+
+%% Module to add the MQTT client ID to authentication properties
+
+%% API
+-export([additional_authn_params/4]).
+
+additional_authn_params(_Creds, _VHost, _Pid, Infos) ->
+ case proplists:get_value(variable_map, Infos, undefined) of
+ VariableMap when is_map(VariableMap) ->
+ case maps:get(<<"client_id">>, VariableMap, []) of
+ ClientId when is_binary(ClientId)->
+ [{client_id, ClientId}];
+ [] ->
+ []
+ end;
+ _ ->
+ []
+ end.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl
new file mode 100644
index 0000000000..0a150caa38
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl
@@ -0,0 +1,43 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_connection_sup).
+
+-behaviour(supervisor2).
+-behaviour(ranch_protocol).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([start_link/4, start_keepalive_link/0]).
+
+-export([init/1]).
+
+%%----------------------------------------------------------------------------
+
+start_link(Ref, _Sock, _Transport, []) ->
+ {ok, SupPid} = supervisor2:start_link(?MODULE, []),
+ {ok, KeepaliveSup} = supervisor2:start_child(
+ SupPid,
+ {rabbit_mqtt_keepalive_sup,
+ {rabbit_mqtt_connection_sup, start_keepalive_link, []},
+ intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}),
+ {ok, ReaderPid} = supervisor2:start_child(
+ SupPid,
+ {rabbit_mqtt_reader,
+ {rabbit_mqtt_reader, start_link, [KeepaliveSup, Ref]},
+ intrinsic, ?WORKER_WAIT, worker, [rabbit_mqtt_reader]}),
+ {ok, SupPid, ReaderPid}.
+
+start_keepalive_link() ->
+ supervisor2:start_link(?MODULE, []).
+
+%%----------------------------------------------------------------------------
+
+init([]) ->
+ {ok, {{one_for_all, 0, 1}, []}}.
+
+
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl
new file mode 100644
index 0000000000..950c5bd6c4
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl
@@ -0,0 +1,224 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_frame).
+
+-export([parse/2, initial_state/0]).
+-export([serialise/1]).
+
+-include("rabbit_mqtt_frame.hrl").
+
+-define(RESERVED, 0).
+-define(MAX_LEN, 16#fffffff).
+-define(HIGHBIT, 2#10000000).
+-define(LOWBITS, 2#01111111).
+
+initial_state() -> none.
+
+parse(<<>>, none) ->
+ {more, fun(Bin) -> parse(Bin, none) end};
+parse(<<MessageType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) ->
+ parse_remaining_len(Rest, #mqtt_frame_fixed{ type = MessageType,
+ dup = bool(Dup),
+ qos = QoS,
+ retain = bool(Retain) });
+parse(Bin, Cont) -> Cont(Bin).
+
+parse_remaining_len(<<>>, Fixed) ->
+ {more, fun(Bin) -> parse_remaining_len(Bin, Fixed) end};
+parse_remaining_len(Rest, Fixed) ->
+ parse_remaining_len(Rest, Fixed, 1, 0).
+
+parse_remaining_len(_Bin, _Fixed, _Multiplier, Length)
+ when Length > ?MAX_LEN ->
+ {error, invalid_mqtt_frame_len};
+parse_remaining_len(<<>>, Fixed, Multiplier, Length) ->
+ {more, fun(Bin) -> parse_remaining_len(Bin, Fixed, Multiplier, Length) end};
+parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) ->
+ parse_remaining_len(Rest, Fixed, Multiplier * ?HIGHBIT, Value + Len * Multiplier);
+parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) ->
+ parse_frame(Rest, Fixed, Value + Len * Multiplier).
+
+parse_frame(Bin, #mqtt_frame_fixed{ type = Type,
+ qos = Qos } = Fixed, Length) ->
+ case {Type, Bin} of
+ {?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} ->
+ {ProtoName, Rest1} = parse_utf(FrameBin),
+ <<ProtoVersion : 8, Rest2/binary>> = Rest1,
+ <<UsernameFlag : 1,
+ PasswordFlag : 1,
+ WillRetain : 1,
+ WillQos : 2,
+ WillFlag : 1,
+ CleanSession : 1,
+ _Reserved : 1,
+ KeepAlive : 16/big,
+ Rest3/binary>> = Rest2,
+ {ClientId, Rest4} = parse_utf(Rest3),
+ {WillTopic, Rest5} = parse_utf(Rest4, WillFlag),
+ {WillMsg, Rest6} = parse_msg(Rest5, WillFlag),
+ {UserName, Rest7} = parse_utf(Rest6, UsernameFlag),
+ {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag),
+ case protocol_name_approved(ProtoVersion, ProtoName) of
+ true ->
+ wrap(Fixed,
+ #mqtt_frame_connect{
+ proto_ver = ProtoVersion,
+ will_retain = bool(WillRetain),
+ will_qos = WillQos,
+ will_flag = bool(WillFlag),
+ clean_sess = bool(CleanSession),
+ keep_alive = KeepAlive,
+ client_id = ClientId,
+ will_topic = WillTopic,
+ will_msg = WillMsg,
+ username = UserName,
+ password = PasssWord}, Rest);
+ false ->
+ {error, protocol_header_corrupt}
+ end;
+ {?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} ->
+ {TopicName, Rest1} = parse_utf(FrameBin),
+ {MessageId, Payload} = case Qos of
+ 0 -> {undefined, Rest1};
+ _ -> <<M:16/big, R/binary>> = Rest1,
+ {M, R}
+ end,
+ wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName,
+ message_id = MessageId },
+ Payload, Rest);
+ {?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} ->
+ <<MessageId:16/big>> = FrameBin,
+ wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest);
+ {Subs, <<FrameBin:Length/binary, Rest/binary>>}
+ when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE ->
+ 1 = Qos,
+ <<MessageId:16/big, Rest1/binary>> = FrameBin,
+ Topics = parse_topics(Subs, Rest1, []),
+ wrap(Fixed, #mqtt_frame_subscribe { message_id = MessageId,
+ topic_table = Topics }, Rest);
+ {Minimal, Rest}
+ when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ ->
+ Length = 0,
+ wrap(Fixed, Rest);
+ {_, TooShortBin} ->
+ {more, fun(BinMore) ->
+ parse_frame(<<TooShortBin/binary, BinMore/binary>>,
+ Fixed, Length)
+ end}
+ end.
+
+parse_topics(_, <<>>, Topics) ->
+ Topics;
+parse_topics(?SUBSCRIBE = Sub, Bin, Topics) ->
+ {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin),
+ parse_topics(Sub, Rest, [#mqtt_topic { name = Name, qos = QoS } | Topics]);
+parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) ->
+ {Name, <<Rest/binary>>} = parse_utf(Bin),
+ parse_topics(Sub, Rest, [#mqtt_topic { name = Name } | Topics]).
+
+wrap(Fixed, Variable, Payload, Rest) ->
+ {ok, #mqtt_frame { variable = Variable, fixed = Fixed, payload = Payload }, Rest}.
+wrap(Fixed, Variable, Rest) ->
+ {ok, #mqtt_frame { variable = Variable, fixed = Fixed }, Rest}.
+wrap(Fixed, Rest) ->
+ {ok, #mqtt_frame { fixed = Fixed }, Rest}.
+
+parse_utf(Bin, 0) ->
+ {undefined, Bin};
+parse_utf(Bin, _) ->
+ parse_utf(Bin).
+
+parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) ->
+ {binary_to_list(Str), Rest}.
+
+parse_msg(Bin, 0) ->
+ {undefined, Bin};
+parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) ->
+ {Msg, Rest}.
+
+bool(0) -> false;
+bool(1) -> true.
+
+%% serialisation
+
+serialise(#mqtt_frame{ fixed = Fixed,
+ variable = Variable,
+ payload = Payload }) ->
+ serialise_variable(Fixed, Variable, serialise_payload(Payload)).
+
+serialise_payload(undefined) -> <<>>;
+serialise_payload(B) when is_binary(B) -> B.
+
+serialise_variable(#mqtt_frame_fixed { type = ?CONNACK } = Fixed,
+ #mqtt_frame_connack { session_present = SessionPresent,
+ return_code = ReturnCode },
+ <<>> = PayloadBin) ->
+ VariableBin = <<?RESERVED:7, (opt(SessionPresent)):1, ReturnCode:8>>,
+ serialise_fixed(Fixed, VariableBin, PayloadBin);
+
+serialise_variable(#mqtt_frame_fixed { type = SubAck } = Fixed,
+ #mqtt_frame_suback { message_id = MessageId,
+ qos_table = Qos },
+ <<>> = _PayloadBin)
+ when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK ->
+ VariableBin = <<MessageId:16/big>>,
+ QosBin = << <<?RESERVED:6, Q:2>> || Q <- Qos >>,
+ serialise_fixed(Fixed, VariableBin, QosBin);
+
+serialise_variable(#mqtt_frame_fixed { type = ?PUBLISH,
+ qos = Qos } = Fixed,
+ #mqtt_frame_publish { topic_name = TopicName,
+ message_id = MessageId },
+ PayloadBin) ->
+ TopicBin = serialise_utf(TopicName),
+ MessageIdBin = case Qos of
+ 0 -> <<>>;
+ 1 -> <<MessageId:16/big>>
+ end,
+ serialise_fixed(Fixed, <<TopicBin/binary, MessageIdBin/binary>>, PayloadBin);
+
+serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed,
+ #mqtt_frame_publish { message_id = MessageId },
+ PayloadBin) ->
+ MessageIdBin = <<MessageId:16/big>>,
+ serialise_fixed(Fixed, MessageIdBin, PayloadBin);
+
+serialise_variable(#mqtt_frame_fixed {} = Fixed,
+ undefined,
+ <<>> = _PayloadBin) ->
+ serialise_fixed(Fixed, <<>>, <<>>).
+
+serialise_fixed(#mqtt_frame_fixed{ type = Type,
+ dup = Dup,
+ qos = Qos,
+ retain = Retain }, VariableBin, PayloadBin)
+ when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT ->
+ Len = size(VariableBin) + size(PayloadBin),
+ true = (Len =< ?MAX_LEN),
+ LenBin = serialise_len(Len),
+ <<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1,
+ LenBin/binary, VariableBin/binary, PayloadBin/binary>>.
+
+serialise_utf(String) ->
+ StringBin = unicode:characters_to_binary(String),
+ Len = size(StringBin),
+ true = (Len =< 16#ffff),
+ <<Len:16/big, StringBin/binary>>.
+
+serialise_len(N) when N =< ?LOWBITS ->
+ <<0:1, N:7>>;
+serialise_len(N) ->
+ <<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>.
+
+opt(undefined) -> ?RESERVED;
+opt(false) -> 0;
+opt(true) -> 1;
+opt(X) when is_integer(X) -> X.
+
+protocol_name_approved(Ver, Name) ->
+ lists:member({Ver, Name}, ?PROTOCOL_NAMES).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl
new file mode 100644
index 0000000000..2a371b4142
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl
@@ -0,0 +1,45 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_internal_event_handler).
+
+-behaviour(gen_event).
+
+-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]).
+
+-import(rabbit_misc, [pget/2]).
+
+init([]) ->
+ {ok, []}.
+
+handle_event({event, vhost_created, Info, _, _}, State) ->
+ Name = pget(name, Info),
+ rabbit_mqtt_retainer_sup:child_for_vhost(Name),
+ {ok, State};
+handle_event({event, vhost_deleted, Info, _, _}, State) ->
+ Name = pget(name, Info),
+ rabbit_mqtt_retainer_sup:delete_child(Name),
+ {ok, State};
+handle_event({event, maintenance_connections_closed, _Info, _, _}, State) ->
+ %% we should close our connections
+ {ok, NConnections} = rabbit_mqtt:close_all_client_connections("node is being put into maintenance mode"),
+ rabbit_log:alert("Closed ~b local MQTT client connections", [NConnections]),
+ {ok, State};
+handle_event(_Event, State) ->
+ {ok, State}.
+
+handle_call(_Request, State) ->
+ {ok, State}.
+
+handle_info(_Info, State) ->
+ {ok, State}.
+
+terminate(_Reason, _State) ->
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
new file mode 100644
index 0000000000..c3a25096e6
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl
@@ -0,0 +1,1054 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_processor).
+
+-export([info/2, initial_state/2, initial_state/5,
+ process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1,
+ close_connection/1, handle_pre_hibernate/0,
+ handle_ra_event/2]).
+
+%% for testing purposes
+-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2,
+ add_client_id_to_adapter_info/2]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_mqtt_frame.hrl").
+-include("rabbit_mqtt.hrl").
+
+-define(APP, rabbitmq_mqtt).
+-define(FRAME_TYPE(Frame, Type),
+ Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}).
+-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12).
+
+initial_state(Socket, SSLLoginName) ->
+ RealSocket = rabbit_net:unwrap_socket(Socket),
+ {ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket),
+ initial_state(RealSocket, SSLLoginName,
+ adapter_info(Socket, 'MQTT'),
+ fun serialise_and_send_to_client/2, PeerAddr).
+
+initial_state(Socket, SSLLoginName,
+ AdapterInfo0 = #amqp_adapter_info{additional_info = Extra},
+ SendFun, PeerAddr) ->
+ {ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} =
+ rabbit_mqtt_util:get_topic_translation_funs(),
+ %% MQTT connections use exactly one channel. The frame max is not
+ %% applicable and there is no way to know what client is used.
+ AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [
+ {channels, 1},
+ {channel_max, 1},
+ {frame_max, 0},
+ {client_properties,
+ [{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]},
+ #proc_state{ unacked_pubs = gb_trees:empty(),
+ awaiting_ack = gb_trees:empty(),
+ message_id = 1,
+ subscriptions = #{},
+ consumer_tags = {undefined, undefined},
+ channels = {undefined, undefined},
+ exchange = rabbit_mqtt_util:env(exchange),
+ socket = Socket,
+ adapter_info = AdapterInfo,
+ ssl_login_name = SSLLoginName,
+ send_fun = SendFun,
+ peer_addr = PeerAddr,
+ mqtt2amqp_fun = M2A,
+ amqp2mqtt_fun = A2M}.
+
+process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
+ PState = #proc_state{ connection = undefined } )
+ when Type =/= ?CONNECT ->
+ {error, connect_expected, PState};
+process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }},
+ PState) ->
+ case process_request(Type, Frame, PState) of
+ {ok, PState1} -> {ok, PState1, PState1#proc_state.connection};
+ Ret -> Ret
+ end.
+
+add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) ->
+ AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}}
+ | AdditionalInfo0],
+ ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, [])
+ ++ [{client_id, longstr, ClientId}],
+ AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of
+ {value, _} ->
+ lists:keyreplace(client_properties,
+ 1,
+ AdditionalInfo1,
+ {client_properties, ClientProperties});
+ false ->
+ [{client_properties, ClientProperties} | AdditionalInfo1]
+ end,
+ AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}.
+
+process_request(?CONNECT,
+ #mqtt_frame{ variable = #mqtt_frame_connect{
+ username = Username,
+ password = Password,
+ proto_ver = ProtoVersion,
+ clean_sess = CleanSess,
+ client_id = ClientId0,
+ keep_alive = Keepalive} = Var},
+ PState0 = #proc_state{ ssl_login_name = SSLLoginName,
+ send_fun = SendFun,
+ adapter_info = AdapterInfo,
+ peer_addr = Addr}) ->
+ ClientId = case ClientId0 of
+ [] -> rabbit_mqtt_util:gen_client_id();
+ [_|_] -> ClientId0
+ end,
+ rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, "
+ "clean session: ~p, protocol version: ~p, keepalive: ~p",
+ [ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]),
+ AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo),
+ PState1 = PState0#proc_state{adapter_info = AdapterInfo1},
+ Ip = list_to_binary(inet:ntoa(Addr)),
+ {Return, PState5} =
+ case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)),
+ ClientId0 =:= [] andalso CleanSess =:= false} of
+ {false, _} ->
+ {?CONNACK_PROTO_VER, PState1};
+ {_, true} ->
+ {?CONNACK_INVALID_ID, PState1};
+ _ ->
+ case creds(Username, Password, SSLLoginName) of
+ nocreds ->
+ rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
+ rabbit_log_connection:error("MQTT login failed: no credentials provided~n"),
+ {?CONNACK_CREDENTIALS, PState1};
+ {invalid_creds, {undefined, Pass}} when is_list(Pass) ->
+ rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt),
+ rabbit_log_connection:error("MQTT login failed: no username is provided"),
+ {?CONNACK_CREDENTIALS, PState1};
+ {invalid_creds, {User, undefined}} when is_list(User) ->
+ rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt),
+ rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]),
+ {?CONNACK_CREDENTIALS, PState1};
+ {UserBin, PassBin} ->
+ case process_login(UserBin, PassBin, ProtoVersion, PState1) of
+ connack_dup_auth ->
+ {SessionPresent0, PState2} = maybe_clean_sess(PState1),
+ {{?CONNACK_ACCEPT, SessionPresent0}, PState2};
+ {?CONNACK_ACCEPT, Conn, VHost, AState} ->
+ case rabbit_mqtt_collector:register(ClientId, self()) of
+ {ok, Corr} ->
+ RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost),
+ link(Conn),
+ {ok, Ch} = amqp_connection:open_channel(Conn),
+ link(Ch),
+ amqp_channel:enable_delivery_flow_control(Ch),
+ Prefetch = rabbit_mqtt_util:env(prefetch),
+ #'basic.qos_ok'{} = amqp_channel:call(Ch,
+ #'basic.qos'{prefetch_count = Prefetch}),
+ rabbit_mqtt_reader:start_keepalive(self(), Keepalive),
+ PState3 = PState1#proc_state{
+ will_msg = make_will_msg(Var),
+ clean_sess = CleanSess,
+ channels = {Ch, undefined},
+ connection = Conn,
+ client_id = ClientId,
+ retainer_pid = RetainerPid,
+ auth_state = AState,
+ register_state = {pending, Corr}},
+ {SessionPresent1, PState4} = maybe_clean_sess(PState3),
+ {{?CONNACK_ACCEPT, SessionPresent1}, PState4};
+ %% e.g. this node was removed from the MQTT cluster members
+ {error, _} = Err ->
+ rabbit_log_connection:error("MQTT cannot accept a connection: "
+ "client ID tracker is unavailable: ~p", [Err]),
+ %% ignore all exceptions, we are shutting down
+ catch amqp_connection:close(Conn),
+ {?CONNACK_SERVER, PState1};
+ {timeout, _} ->
+ rabbit_log_connection:error("MQTT cannot accept a connection: "
+ "client ID registration timed out"),
+ %% ignore all exceptions, we are shutting down
+ catch amqp_connection:close(Conn),
+ {?CONNACK_SERVER, PState1}
+ end;
+ ConnAck -> {ConnAck, PState1}
+ end
+ end
+ end,
+ {ReturnCode, SessionPresent} = case Return of
+ {?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool};
+ Other -> {Other, false}
+ end,
+ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK},
+ variable = #mqtt_frame_connack{
+ session_present = SessionPresent,
+ return_code = ReturnCode}},
+ PState5),
+ case ReturnCode of
+ ?CONNACK_ACCEPT -> {ok, PState5};
+ ?CONNACK_CREDENTIALS -> {error, unauthenticated, PState5};
+ ?CONNACK_AUTH -> {error, unauthorized, PState5};
+ ?CONNACK_SERVER -> {error, unavailable, PState5};
+ ?CONNACK_INVALID_ID -> {error, invalid_client_id, PState5};
+ ?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState5}
+ end;
+
+process_request(?PUBACK,
+ #mqtt_frame{
+ variable = #mqtt_frame_publish{ message_id = MessageId }},
+ #proc_state{ channels = {Channel, _},
+ awaiting_ack = Awaiting } = PState) ->
+ %% tag can be missing because of bogus clients and QoS downgrades
+ case gb_trees:is_defined(MessageId, Awaiting) of
+ false ->
+ {ok, PState};
+ true ->
+ Tag = gb_trees:get(MessageId, Awaiting),
+ amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }),
+ {ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }}
+ end;
+
+process_request(?PUBLISH,
+ Frame = #mqtt_frame{
+ fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }},
+ PState) ->
+ % Downgrade QOS_2 to QOS_1
+ process_request(?PUBLISH,
+ Frame#mqtt_frame{
+ fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }},
+ PState);
+process_request(?PUBLISH,
+ #mqtt_frame{
+ fixed = #mqtt_frame_fixed{ qos = Qos,
+ retain = Retain,
+ dup = Dup },
+ variable = #mqtt_frame_publish{ topic_name = Topic,
+ message_id = MessageId },
+ payload = Payload },
+ PState = #proc_state{retainer_pid = RPid,
+ amqp2mqtt_fun = Amqp2MqttFun}) ->
+ check_publish(Topic, fun() ->
+ Msg = #mqtt_msg{retain = Retain,
+ qos = Qos,
+ topic = Topic,
+ dup = Dup,
+ message_id = MessageId,
+ payload = Payload},
+ Result = amqp_pub(Msg, PState),
+ case Retain of
+ false -> ok;
+ true -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg)
+ end,
+ {ok, Result}
+ end, PState);
+
+process_request(?SUBSCRIBE,
+ #mqtt_frame{
+ variable = #mqtt_frame_subscribe{
+ message_id = SubscribeMsgId,
+ topic_table = Topics},
+ payload = undefined},
+ #proc_state{channels = {Channel, _},
+ exchange = Exchange,
+ retainer_pid = RPid,
+ send_fun = SendFun,
+ message_id = StateMsgId,
+ mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) ->
+ rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]),
+ check_subscribe(Topics, fun() ->
+ {QosResponse, PState1} =
+ lists:foldl(fun (#mqtt_topic{name = TopicName,
+ qos = Qos}, {QosList, PState}) ->
+ SupportedQos = supported_subs_qos(Qos),
+ {Queue, #proc_state{subscriptions = Subs} = PState1} =
+ ensure_queue(SupportedQos, PState),
+ RoutingKey = Mqtt2AmqpFun(TopicName),
+ Binding = #'queue.bind'{
+ queue = Queue,
+ exchange = Exchange,
+ routing_key = RoutingKey},
+ #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding),
+ SupportedQosList = case maps:find(TopicName, Subs) of
+ {ok, L} -> [SupportedQos|L];
+ error -> [SupportedQos]
+ end,
+ {[SupportedQos | QosList],
+ PState1 #proc_state{
+ subscriptions =
+ maps:put(TopicName, SupportedQosList, Subs)}}
+ end, {[], PState0}, Topics),
+ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK},
+ variable = #mqtt_frame_suback{
+ message_id = SubscribeMsgId,
+ qos_table = QosResponse}}, PState1),
+ %% we may need to send up to length(Topics) messages.
+ %% if QoS is > 0 then we need to generate a message id,
+ %% and increment the counter.
+ StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId),
+ N = lists:foldl(fun (Topic, Acc) ->
+ case maybe_send_retained_message(RPid, Topic, Acc, PState1) of
+ {true, X} -> Acc + X;
+ false -> Acc
+ end
+ end, StartMsgId, Topics),
+ {ok, PState1#proc_state{message_id = N}}
+ end, PState0);
+
+process_request(?UNSUBSCRIBE,
+ #mqtt_frame{
+ variable = #mqtt_frame_subscribe{ message_id = MessageId,
+ topic_table = Topics },
+ payload = undefined }, #proc_state{ channels = {Channel, _},
+ exchange = Exchange,
+ client_id = ClientId,
+ subscriptions = Subs0,
+ send_fun = SendFun,
+ mqtt2amqp_fun = Mqtt2AmqpFun } = PState) ->
+ rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]),
+ Queues = rabbit_mqtt_util:subcription_queue_name(ClientId),
+ Subs1 =
+ lists:foldl(
+ fun (#mqtt_topic{ name = TopicName }, Subs) ->
+ QosSubs = case maps:find(TopicName, Subs) of
+ {ok, Val} when is_list(Val) -> lists:usort(Val);
+ error -> []
+ end,
+ RoutingKey = Mqtt2AmqpFun(TopicName),
+ lists:foreach(
+ fun (QosSub) ->
+ Queue = element(QosSub + 1, Queues),
+ Binding = #'queue.unbind'{
+ queue = Queue,
+ exchange = Exchange,
+ routing_key = RoutingKey},
+ #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding)
+ end, QosSubs),
+ maps:remove(TopicName, Subs)
+ end, Subs0, Topics),
+ SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK },
+ variable = #mqtt_frame_suback{ message_id = MessageId }},
+ PState),
+ {ok, PState #proc_state{ subscriptions = Subs1 }};
+
+process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) ->
+ rabbit_log_connection:debug("Received a PINGREQ"),
+ SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }},
+ PState),
+ rabbit_log_connection:debug("Sent a PINGRESP"),
+ {ok, PState};
+
+process_request(?DISCONNECT, #mqtt_frame{}, PState) ->
+ rabbit_log_connection:debug("Received a DISCONNECT"),
+ {stop, PState}.
+
+hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) ->
+ Topic1 = Amqp2MqttFun(Topic0),
+ rabbit_mqtt_retainer:clear(RetainerPid, Topic1),
+ ok;
+hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) ->
+ Topic1 = Amqp2MqttFun(Topic0),
+ rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg),
+ ok.
+
+maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId,
+ #proc_state{ send_fun = SendFun,
+ amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
+ Topic1 = Amqp2MqttFun(Topic0),
+ case rabbit_mqtt_retainer:fetch(RPid, Topic1) of
+ undefined -> false;
+ Msg ->
+ %% calculate effective QoS as the lower value of SUBSCRIBE frame QoS
+ %% and retained message QoS. The spec isn't super clear on this, we
+ %% do what Mosquitto does, per user feedback.
+ Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos),
+ Id = case Qos of
+ ?QOS_0 -> undefined;
+ ?QOS_1 -> MsgId
+ end,
+ SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{
+ type = ?PUBLISH,
+ qos = Qos,
+ dup = false,
+ retain = Msg#mqtt_msg.retain
+ }, variable = #mqtt_frame_publish{
+ message_id = Id,
+ topic_name = Topic1
+ },
+ payload = Msg#mqtt_msg.payload}, PState),
+ case Qos of
+ ?QOS_0 -> false;
+ ?QOS_1 -> {true, 1}
+ end
+ end.
+
+amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag,
+ delivery_tag = DeliveryTag,
+ routing_key = RoutingKey },
+ #amqp_msg{ props = #'P_basic'{ headers = Headers },
+ payload = Payload },
+ DeliveryCtx} = Delivery,
+ #proc_state{ channels = {Channel, _},
+ awaiting_ack = Awaiting,
+ message_id = MsgId,
+ send_fun = SendFun,
+ amqp2mqtt_fun = Amqp2MqttFun } = PState) ->
+ amqp_channel:notify_received(DeliveryCtx),
+ case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of
+ {true, {?QOS_0, ?QOS_1}} ->
+ amqp_channel:cast(
+ Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
+ {ok, PState};
+ {true, {?QOS_0, ?QOS_0}} ->
+ {ok, PState};
+ {Dup, {DeliveryQos, _SubQos} = Qos} ->
+ TopicName = Amqp2MqttFun(RoutingKey),
+ SendFun(
+ #mqtt_frame{ fixed = #mqtt_frame_fixed{
+ type = ?PUBLISH,
+ qos = DeliveryQos,
+ dup = Dup },
+ variable = #mqtt_frame_publish{
+ message_id =
+ case DeliveryQos of
+ ?QOS_0 -> undefined;
+ ?QOS_1 -> MsgId
+ end,
+ topic_name = TopicName },
+ payload = Payload}, PState),
+ case Qos of
+ {?QOS_0, ?QOS_0} ->
+ {ok, PState};
+ {?QOS_1, ?QOS_1} ->
+ Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting),
+ PState1 = PState#proc_state{ awaiting_ack = Awaiting1 },
+ PState2 = next_msg_id(PState1),
+ {ok, PState2};
+ {?QOS_0, ?QOS_1} ->
+ amqp_channel:cast(
+ Channel, #'basic.ack'{ delivery_tag = DeliveryTag }),
+ {ok, PState}
+ end
+ end;
+
+amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack,
+ PState = #proc_state{ unacked_pubs = UnackedPubs,
+ send_fun = SendFun }) ->
+ case gb_trees:size(UnackedPubs) > 0 andalso
+ gb_trees:take_smallest(UnackedPubs) of
+ {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag ->
+ SendFun(
+ #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
+ variable = #mqtt_frame_publish{ message_id = MsgId }},
+ PState),
+ amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 });
+ _ ->
+ {ok, PState}
+ end;
+
+amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag },
+ PState = #proc_state{ unacked_pubs = UnackedPubs,
+ send_fun = SendFun }) ->
+ SendFun(
+ #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK },
+ variable = #mqtt_frame_publish{
+ message_id = gb_trees:get(
+ Tag, UnackedPubs) }}, PState),
+ {ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}.
+
+delivery_dup({#'basic.deliver'{ redelivered = Redelivered },
+ #amqp_msg{ props = #'P_basic'{ headers = Headers }},
+ _DeliveryCtx}) ->
+ case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of
+ undefined -> Redelivered;
+ {bool, Dup} -> Redelivered orelse Dup
+ end.
+
+ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff ->
+ 1;
+ensure_valid_mqtt_message_id(Id) ->
+ Id.
+
+safe_max_id(Id0, Id1) ->
+ ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)).
+
+next_msg_id(PState = #proc_state{ message_id = MsgId0 }) ->
+ MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1),
+ PState#proc_state{ message_id = MsgId1 }.
+
+%% decide at which qos level to deliver based on subscription
+%% and the message publish qos level. non-MQTT publishes are
+%% assumed to be qos 1, regardless of delivery_mode.
+delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) ->
+ {?QOS_0, ?QOS_0};
+delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) ->
+ case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of
+ {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1};
+ undefined -> {?QOS_1, ?QOS_1}
+ end.
+
+maybe_clean_sess(PState = #proc_state { clean_sess = false,
+ connection = Conn,
+ client_id = ClientId }) ->
+ SessionPresent = session_present(Conn, ClientId),
+ {_Queue, PState1} = ensure_queue(?QOS_1, PState),
+ {SessionPresent, PState1};
+maybe_clean_sess(PState = #proc_state { clean_sess = true,
+ connection = Conn,
+ client_id = ClientId }) ->
+ {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId),
+ {ok, Channel} = amqp_connection:open_channel(Conn),
+ ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of
+ #'queue.delete_ok'{} -> ok
+ catch
+ exit:_Error -> ok
+ after
+ amqp_channel:close(Channel)
+ end,
+ {false, PState}.
+
+session_present(Conn, ClientId) ->
+ {_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
+ Declare = #'queue.declare'{queue = QueueQ1,
+ passive = true},
+ {ok, Channel} = amqp_connection:open_channel(Conn),
+ try
+ amqp_channel:call(Channel, Declare),
+ amqp_channel:close(Channel),
+ true
+ catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} ->
+ false
+ end.
+
+make_will_msg(#mqtt_frame_connect{ will_flag = false }) ->
+ undefined;
+make_will_msg(#mqtt_frame_connect{ will_retain = Retain,
+ will_qos = Qos,
+ will_topic = Topic,
+ will_msg = Msg }) ->
+ #mqtt_msg{ retain = Retain,
+ qos = Qos,
+ topic = Topic,
+ dup = false,
+ payload = Msg }.
+
+process_login(_UserBin, _PassBin, _ProtoVersion,
+ #proc_state{channels = {Channel, _},
+ peer_addr = Addr,
+ auth_state = #auth_state{username = Username,
+ vhost = VHost}}) when is_pid(Channel) ->
+ UsernameStr = rabbit_data_coercion:to_list(Username),
+ VHostStr = rabbit_data_coercion:to_list(VHost),
+ rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt),
+ rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~p, vhost ~p",
+ [UsernameStr, VHostStr]),
+ connack_dup_auth;
+process_login(UserBin, PassBin, ProtoVersion,
+ #proc_state{channels = {undefined, undefined},
+ socket = Sock,
+ adapter_info = AdapterInfo,
+ ssl_login_name = SslLoginName,
+ peer_addr = Addr}) ->
+ {ok, {_, _, _, ToPort}} = rabbit_net:socket_ends(Sock, inbound),
+ {VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, ToPort),
+ rabbit_log_connection:info(
+ "MQTT vhost picked using ~s~n",
+ [human_readable_vhost_lookup_strategy(VHostPickedUsing)]),
+ RemoteAddress = list_to_binary(inet:ntoa(Addr)),
+ case rabbit_vhost:exists(VHost) of
+ true ->
+ case amqp_connection:start(#amqp_params_direct{
+ username = UsernameBin,
+ password = PassBin,
+ virtual_host = VHost,
+ adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of
+ {ok, Connection} ->
+ case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of
+ ok ->
+ rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin,
+ mqtt),
+ [{internal_user, InternalUser}] = amqp_connection:info(
+ Connection, [internal_user]),
+ {?CONNACK_ACCEPT, Connection, VHost,
+ #auth_state{user = InternalUser,
+ username = UsernameBin,
+ vhost = VHost}};
+ not_allowed ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin,
+ mqtt),
+ amqp_connection:close(Connection),
+ rabbit_log_connection:warning(
+ "MQTT login failed for ~p access_refused "
+ "(access must be from localhost)~n",
+ [binary_to_list(UsernameBin)]),
+ ?CONNACK_AUTH
+ end;
+ {error, {auth_failure, Explanation}} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
+ rabbit_log_connection:error("MQTT login failed for user '~p' auth_failure: ~s~n",
+ [binary_to_list(UserBin), Explanation]),
+ ?CONNACK_CREDENTIALS;
+ {error, access_refused} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
+ rabbit_log_connection:warning("MQTT login failed for user '~p': access_refused "
+ "(vhost access not allowed)~n",
+ [binary_to_list(UserBin)]),
+ ?CONNACK_AUTH;
+ {error, not_allowed} ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
+ %% when vhost allowed for TLS connection
+ rabbit_log_connection:warning("MQTT login failed for ~p access_refused "
+ "(vhost access not allowed)~n",
+ [binary_to_list(UserBin)]),
+ ?CONNACK_AUTH
+ end;
+ false ->
+ rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt),
+ rabbit_log_connection:error("MQTT login failed for user '~p' auth_failure: vhost ~s does not exist~n",
+ [binary_to_list(UserBin), VHost]),
+ ?CONNACK_CREDENTIALS
+ end.
+
+get_vhost(UserBin, none, Port) ->
+ get_vhost_no_ssl(UserBin, Port);
+get_vhost(UserBin, undefined, Port) ->
+ get_vhost_no_ssl(UserBin, Port);
+get_vhost(UserBin, SslLogin, Port) ->
+ get_vhost_ssl(UserBin, SslLogin, Port).
+
+get_vhost_no_ssl(UserBin, Port) ->
+ case vhost_in_username(UserBin) of
+ true ->
+ {vhost_in_username_or_default, get_vhost_username(UserBin)};
+ false ->
+ PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
+ mqtt_port_to_vhost_mapping
+ ),
+ case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
+ undefined ->
+ {default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}};
+ VHost ->
+ {port_to_vhost_mapping, {VHost, UserBin}}
+ end
+ end.
+
+get_vhost_ssl(UserBin, SslLoginName, Port) ->
+ UserVirtualHostMapping = rabbit_runtime_parameters:value_global(
+ mqtt_default_vhosts
+ ),
+ case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of
+ undefined ->
+ PortVirtualHostMapping = rabbit_runtime_parameters:value_global(
+ mqtt_port_to_vhost_mapping
+ ),
+ case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of
+ undefined ->
+ {vhost_in_username_or_default, get_vhost_username(UserBin)};
+ VHostFromPortMapping ->
+ {port_to_vhost_mapping, {VHostFromPortMapping, UserBin}}
+ end;
+ VHostFromCertMapping ->
+ {cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}}
+ end.
+
+vhost_in_username(UserBin) ->
+ case application:get_env(?APP, ignore_colons_in_username) of
+ {ok, true} -> false;
+ _ ->
+ %% split at the last colon, disallowing colons in username
+ case re:split(UserBin, ":(?!.*?:)") of
+ [_, _] -> true;
+ [UserBin] -> false
+ end
+ end.
+
+get_vhost_username(UserBin) ->
+ Default = {rabbit_mqtt_util:env(vhost), UserBin},
+ case application:get_env(?APP, ignore_colons_in_username) of
+ {ok, true} -> Default;
+ _ ->
+ %% split at the last colon, disallowing colons in username
+ case re:split(UserBin, ":(?!.*?:)") of
+ [Vhost, UserName] -> {Vhost, UserName};
+ [UserBin] -> Default
+ end
+ end.
+
+get_vhost_from_user_mapping(_User, not_found) ->
+ undefined;
+get_vhost_from_user_mapping(User, Mapping) ->
+ M = rabbit_data_coercion:to_proplist(Mapping),
+ case rabbit_misc:pget(User, M) of
+ undefined ->
+ undefined;
+ VHost ->
+ VHost
+ end.
+
+get_vhost_from_port_mapping(_Port, not_found) ->
+ undefined;
+get_vhost_from_port_mapping(Port, Mapping) ->
+ M = rabbit_data_coercion:to_proplist(Mapping),
+ Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of
+ undefined ->
+ undefined;
+ VHost ->
+ VHost
+ end,
+ Res.
+
+human_readable_vhost_lookup_strategy(vhost_in_username_or_default) ->
+ "vhost in username or default";
+human_readable_vhost_lookup_strategy(port_to_vhost_mapping) ->
+ "MQTT port to vhost mapping";
+human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) ->
+ "client certificate to vhost mapping";
+human_readable_vhost_lookup_strategy(default_vhost) ->
+ "plugin configuration or default";
+human_readable_vhost_lookup_strategy(Val) ->
+ atom_to_list(Val).
+
+creds(User, Pass, SSLLoginName) ->
+ DefaultUser = rabbit_mqtt_util:env(default_user),
+ DefaultPass = rabbit_mqtt_util:env(default_pass),
+ {ok, Anon} = application:get_env(?APP, allow_anonymous),
+ {ok, TLSAuth} = application:get_env(?APP, ssl_cert_login),
+ HaveDefaultCreds = Anon =:= true andalso
+ is_binary(DefaultUser) andalso
+ is_binary(DefaultPass),
+
+ CredentialsProvided = User =/= undefined orelse
+ Pass =/= undefined,
+
+ CorrectCredentials = is_list(User) andalso
+ is_list(Pass),
+
+ SSLLoginProvided = TLSAuth =:= true andalso
+ SSLLoginName =/= none,
+
+ case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of
+ %% Username and password take priority
+ {true, true, _, _} -> {list_to_binary(User),
+ list_to_binary(Pass)};
+ %% Either username or password is provided
+ {true, false, _, _} -> {invalid_creds, {User, Pass}};
+ %% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided.
+ %% Authenticating using username only.
+ {false, false, true, _} -> {SSLLoginName, none};
+ %% Anonymous connection uses default credentials
+ {false, false, false, true} -> {DefaultUser, DefaultPass};
+ _ -> nocreds
+ end.
+
+supported_subs_qos(?QOS_0) -> ?QOS_0;
+supported_subs_qos(?QOS_1) -> ?QOS_1;
+supported_subs_qos(?QOS_2) -> ?QOS_1.
+
+delivery_mode(?QOS_0) -> 1;
+delivery_mode(?QOS_1) -> 2;
+delivery_mode(?QOS_2) -> 2.
+
+%% different qos subscriptions are received in different queues
+%% with appropriate durability and timeout arguments
+%% this will lead to duplicate messages for overlapping subscriptions
+%% with different qos values - todo: prevent duplicates
+ensure_queue(Qos, #proc_state{ channels = {Channel, _},
+ client_id = ClientId,
+ clean_sess = CleanSess,
+ consumer_tags = {TagQ0, TagQ1} = Tags} = PState) ->
+ {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId),
+ Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of
+ {undefined, _} ->
+ [];
+ {Ms, false} when is_integer(Ms) ->
+ [{<<"x-expires">>, long, Ms}];
+ _ ->
+ []
+ end,
+ QueueSetup =
+ case {TagQ0, TagQ1, Qos} of
+ {undefined, _, ?QOS_0} ->
+ {QueueQ0,
+ #'queue.declare'{ queue = QueueQ0,
+ durable = false,
+ auto_delete = true },
+ #'basic.consume'{ queue = QueueQ0,
+ no_ack = true }};
+ {_, undefined, ?QOS_1} ->
+ {QueueQ1,
+ #'queue.declare'{ queue = QueueQ1,
+ durable = true,
+ %% Clean session means a transient connection,
+ %% translating into auto-delete.
+ %%
+ %% see rabbitmq/rabbitmq-mqtt#37
+ auto_delete = CleanSess,
+ arguments = Qos1Args },
+ #'basic.consume'{ queue = QueueQ1,
+ no_ack = false }};
+ {_, _, ?QOS_0} ->
+ {exists, QueueQ0};
+ {_, _, ?QOS_1} ->
+ {exists, QueueQ1}
+ end,
+ case QueueSetup of
+ {Queue, Declare, Consume} ->
+ #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare),
+ #'basic.consume_ok'{ consumer_tag = Tag } =
+ amqp_channel:call(Channel, Consume),
+ {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }};
+ {exists, Q} ->
+ {Q, PState}
+ end.
+
+send_will(PState = #proc_state{will_msg = undefined}) ->
+ PState;
+
+send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain,
+ topic = Topic},
+ retainer_pid = RPid,
+ channels = {ChQos0, ChQos1},
+ amqp2mqtt_fun = Amqp2MqttFun}) ->
+ case check_topic_access(Topic, write, PState) of
+ ok ->
+ amqp_pub(WillMsg, PState),
+ case Retain of
+ false -> ok;
+ true ->
+ hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg)
+ end;
+ Error ->
+ rabbit_log:warning(
+ "Could not send last will: ~p~n",
+ [Error])
+ end,
+ case ChQos1 of
+ undefined -> ok;
+ _ -> amqp_channel:close(ChQos1)
+ end,
+ case ChQos0 of
+ undefined -> ok;
+ _ -> amqp_channel:close(ChQos0)
+ end,
+ PState #proc_state{ channels = {undefined, undefined} }.
+
+amqp_pub(undefined, PState) ->
+ PState;
+
+%% set up a qos1 publishing channel if necessary
+%% this channel will only be used for publishing, not consuming
+amqp_pub(Msg = #mqtt_msg{ qos = ?QOS_1 },
+ PState = #proc_state{ channels = {ChQos0, undefined},
+ awaiting_seqno = undefined,
+ connection = Conn }) ->
+ {ok, Channel} = amqp_connection:open_channel(Conn),
+ #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}),
+ amqp_channel:register_confirm_handler(Channel, self()),
+ amqp_pub(Msg, PState #proc_state{ channels = {ChQos0, Channel},
+ awaiting_seqno = 1 });
+
+amqp_pub(#mqtt_msg{ qos = Qos,
+ topic = Topic,
+ dup = Dup,
+ message_id = MessageId,
+ payload = Payload },
+ PState = #proc_state{ channels = {ChQos0, ChQos1},
+ exchange = Exchange,
+ unacked_pubs = UnackedPubs,
+ awaiting_seqno = SeqNo,
+ mqtt2amqp_fun = Mqtt2AmqpFun }) ->
+ RoutingKey = Mqtt2AmqpFun(Topic),
+ Method = #'basic.publish'{ exchange = Exchange,
+ routing_key = RoutingKey },
+ Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos},
+ {<<"x-mqtt-dup">>, bool, Dup}],
+ Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers,
+ delivery_mode = delivery_mode(Qos)},
+ payload = Payload },
+ {UnackedPubs1, Ch, SeqNo1} =
+ case Qos =:= ?QOS_1 andalso MessageId =/= undefined of
+ true -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1,
+ SeqNo + 1};
+ false -> {UnackedPubs, ChQos0, SeqNo}
+ end,
+ amqp_channel:cast_flow(Ch, Method, Msg),
+ PState #proc_state{ unacked_pubs = UnackedPubs1,
+ awaiting_seqno = SeqNo1 }.
+
+adapter_info(Sock, ProtoName) ->
+ amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}).
+
+set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) ->
+ AdapterInfo#amqp_adapter_info{protocol = {Proto,
+ human_readable_mqtt_version(Vsn)}}.
+
+human_readable_mqtt_version(3) ->
+ "3.1.0";
+human_readable_mqtt_version(4) ->
+ "3.1.1";
+human_readable_mqtt_version(_) ->
+ "N/A".
+
+serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) ->
+ try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of
+ Res ->
+ Res
+ catch _:Error ->
+ rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"),
+ rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p",
+ [Sock, Error, Frame])
+ end.
+
+close_connection(PState = #proc_state{ connection = undefined }) ->
+ PState;
+close_connection(PState = #proc_state{ connection = Connection,
+ client_id = ClientId }) ->
+ % todo: maybe clean session
+ case ClientId of
+ undefined -> ok;
+ _ ->
+ case rabbit_mqtt_collector:unregister(ClientId, self()) of
+ ok -> ok;
+ %% ignore as we are shutting down
+ {timeout, _} -> ok
+ end
+ end,
+ %% ignore noproc or other exceptions, we are shutting down
+ catch amqp_connection:close(Connection),
+ PState #proc_state{ channels = {undefined, undefined},
+ connection = undefined }.
+
+handle_pre_hibernate() ->
+ erase(topic_permission_cache),
+ ok.
+
+handle_ra_event({applied, [{Corr, ok}]},
+ PState = #proc_state{register_state = {pending, Corr}}) ->
+ %% success case - command was applied transition into registered state
+ PState#proc_state{register_state = registered};
+handle_ra_event({not_leader, Leader, Corr},
+ PState = #proc_state{register_state = {pending, Corr},
+ client_id = ClientId}) ->
+ %% retry command against actual leader
+ {ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()),
+ PState#proc_state{register_state = {pending, NewCorr}};
+handle_ra_event(register_timeout,
+ PState = #proc_state{register_state = {pending, _Corr},
+ client_id = ClientId}) ->
+ {ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()),
+ PState#proc_state{register_state = {pending, NewCorr}};
+handle_ra_event(register_timeout, PState) ->
+ PState;
+handle_ra_event(Evt, PState) ->
+ %% log these?
+ rabbit_log:debug("unhandled ra_event: ~w ~n", [Evt]),
+ PState.
+
+%% NB: check_*: MQTT spec says we should ack normally, ie pretend there
+%% was no auth error, but here we are closing the connection with an error. This
+%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level.
+
+check_publish(TopicName, Fn, PState) ->
+ case check_topic_access(TopicName, write, PState) of
+ ok -> Fn();
+ _ -> {error, unauthorized, PState}
+ end.
+
+check_subscribe([], Fn, _) ->
+ Fn();
+
+check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) ->
+ case check_topic_access(TopicName, read, PState) of
+ ok -> check_subscribe(Topics, Fn, PState);
+ _ -> {error, unauthorized, PState}
+ end.
+
+check_topic_access(TopicName, Access,
+ #proc_state{
+ auth_state = #auth_state{user = User = #user{username = Username},
+ vhost = VHost},
+ exchange = Exchange,
+ client_id = ClientId,
+ mqtt2amqp_fun = Mqtt2AmqpFun }) ->
+ Cache =
+ case get(topic_permission_cache) of
+ undefined -> [];
+ Other -> Other
+ end,
+
+ Key = {TopicName, Username, ClientId, VHost, Exchange, Access},
+ case lists:member(Key, Cache) of
+ true ->
+ ok;
+ false ->
+ Resource = #resource{virtual_host = VHost,
+ kind = topic,
+ name = Exchange},
+
+ RoutingKey = Mqtt2AmqpFun(TopicName),
+ Context = #{routing_key => RoutingKey,
+ variable_map => #{
+ <<"username">> => Username,
+ <<"vhost">> => VHost,
+ <<"client_id">> => rabbit_data_coercion:to_binary(ClientId)
+ }
+ },
+
+ try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of
+ ok ->
+ CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1),
+ put(topic_permission_cache, [Key | CacheTail]),
+ ok;
+ R ->
+ R
+ catch
+ _:{amqp_error, access_refused, Msg, _} ->
+ rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]),
+ {error, access_refused};
+ _:Error ->
+ rabbit_log:error("~p~n", [Error]),
+ {error, access_refused}
+ end
+ end.
+
+info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val;
+info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val;
+info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val;
+info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val;
+info(message_id, #proc_state{message_id = Val}) -> Val;
+info(client_id, #proc_state{client_id = Val}) ->
+ rabbit_data_coercion:to_binary(Val);
+info(clean_sess, #proc_state{clean_sess = Val}) -> Val;
+info(will_msg, #proc_state{will_msg = Val}) -> Val;
+info(channels, #proc_state{channels = Val}) -> Val;
+info(exchange, #proc_state{exchange = Val}) -> Val;
+info(adapter_info, #proc_state{adapter_info = Val}) -> Val;
+info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val;
+info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val;
+info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val;
+info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val;
+info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val;
+info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val;
+info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val;
+info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val;
+info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) ->
+ case Val of
+ {Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)};
+ Other -> Other
+ end;
+info(channels, PState) -> additional_info(channels, PState);
+info(channel_max, PState) -> additional_info(channel_max, PState);
+info(frame_max, PState) -> additional_info(frame_max, PState);
+info(client_properties, PState) -> additional_info(client_properties, PState);
+info(ssl, PState) -> additional_info(ssl, PState);
+info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState);
+info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState);
+info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState);
+info(ssl_hash, PState) -> additional_info(ssl_hash, PState);
+info(Other, _) -> throw({bad_argument, Other}).
+
+
+additional_info(Key,
+ #proc_state{adapter_info =
+ #amqp_adapter_info{additional_info = AddInfo}}) ->
+ proplists:get_value(Key, AddInfo).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
new file mode 100644
index 0000000000..39c0761321
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl
@@ -0,0 +1,480 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_reader).
+
+%% Transitional step until we can require Erlang/OTP 21 and
+%% use the now recommended try/catch syntax for obtaining the stack trace.
+-compile(nowarn_deprecated_function).
+
+-behaviour(gen_server2).
+
+-export([start_link/2]).
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ code_change/3, terminate/2, handle_pre_hibernate/1]).
+
+-export([conserve_resources/3, start_keepalive/2,
+ close_connection/2]).
+
+-export([ssl_login_name/1]).
+-export([info/2]).
+
+-include_lib("amqp_client/include/amqp_client.hrl").
+-include("rabbit_mqtt.hrl").
+
+-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]).
+-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state]).
+
+%%----------------------------------------------------------------------------
+
+start_link(KeepaliveSup, Ref) ->
+ Pid = proc_lib:spawn_link(?MODULE, init,
+ [[KeepaliveSup, Ref]]),
+
+ {ok, Pid}.
+
+conserve_resources(Pid, _, {_, Conserve, _}) ->
+ Pid ! {conserve_resources, Conserve},
+ ok.
+
+info(Pid, InfoItems) ->
+ case InfoItems -- ?INFO_ITEMS of
+ [] -> gen_server2:call(Pid, {info, InfoItems});
+ UnknownItems -> throw({bad_argument, UnknownItems})
+ end.
+
+close_connection(Pid, Reason) ->
+ gen_server:cast(Pid, {close_connection, Reason}).
+
+%%----------------------------------------------------------------------------
+
+init([KeepaliveSup, Ref]) ->
+ process_flag(trap_exit, true),
+ {ok, Sock} = rabbit_networking:handshake(Ref,
+ application:get_env(rabbitmq_mqtt, proxy_protocol, false)),
+ RealSocket = rabbit_net:unwrap_socket(Sock),
+ case rabbit_net:connection_string(Sock, inbound) of
+ {ok, ConnStr} ->
+ rabbit_log_connection:debug("MQTT accepting TCP connection ~p (~s)~n", [self(), ConnStr]),
+ rabbit_alarm:register(
+ self(), {?MODULE, conserve_resources, []}),
+ ProcessorState = rabbit_mqtt_processor:initial_state(Sock,ssl_login_name(RealSocket)),
+ gen_server2:enter_loop(?MODULE, [],
+ rabbit_event:init_stats_timer(
+ control_throttle(
+ #state{socket = RealSocket,
+ conn_name = ConnStr,
+ await_recv = false,
+ connection_state = running,
+ received_connect_frame = false,
+ keepalive = {none, none},
+ keepalive_sup = KeepaliveSup,
+ conserve = false,
+ parse_state = rabbit_mqtt_frame:initial_state(),
+ proc_state = ProcessorState }), #state.stats_timer),
+ {backoff, 1000, 1000, 10000});
+ {network_error, Reason} ->
+ rabbit_net:fast_close(RealSocket),
+ terminate({shutdown, Reason}, undefined);
+ {error, enotconn} ->
+ rabbit_net:fast_close(RealSocket),
+ terminate(shutdown, undefined);
+ {error, Reason} ->
+ rabbit_net:fast_close(RealSocket),
+ terminate({network_error, Reason}, undefined)
+ end.
+
+handle_call({info, InfoItems}, _From, State) ->
+ Infos = lists:map(
+ fun(InfoItem) ->
+ {InfoItem, info_internal(InfoItem, State)}
+ end,
+ InfoItems),
+ {reply, Infos, State};
+
+handle_call(Msg, From, State) ->
+ {stop, {mqtt_unexpected_call, Msg, From}, State}.
+
+handle_cast(duplicate_id,
+ State = #state{ proc_state = PState,
+ conn_name = ConnName }) ->
+ rabbit_log_connection:warning("MQTT disconnecting client ~p with duplicate id '~s'~n",
+ [ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
+ {stop, {shutdown, duplicate_id}, State};
+
+handle_cast(decommission_node,
+ State = #state{ proc_state = PState,
+ conn_name = ConnName }) ->
+ rabbit_log_connection:warning("MQTT disconnecting client ~p with client ID '~s' as its node is about"
+ " to be decommissioned~n",
+ [ConnName, rabbit_mqtt_processor:info(client_id, PState)]),
+ {stop, {shutdown, decommission_node}, State};
+
+handle_cast({close_connection, Reason},
+ State = #state{conn_name = ConnName, proc_state = PState}) ->
+ rabbit_log_connection:warning("MQTT disconnecting client ~p with client ID '~s', reason: ~s",
+ [ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]),
+ {stop, {shutdown, server_initiated_close}, State};
+
+handle_cast(Msg, State) ->
+ {stop, {mqtt_unexpected_cast, Msg}, State}.
+
+handle_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery,
+ State = #state{ proc_state = ProcState }) ->
+ callback_reply(State, rabbit_mqtt_processor:amqp_callback(Delivery,
+ ProcState));
+
+handle_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState }) ->
+ callback_reply(State, rabbit_mqtt_processor:amqp_callback(Ack, ProcState));
+
+handle_info(#'basic.consume_ok'{}, State) ->
+ {noreply, State, hibernate};
+
+handle_info(#'basic.cancel'{}, State) ->
+ {stop, {shutdown, subscription_cancelled}, State};
+
+handle_info({'EXIT', _Conn, Reason}, State) ->
+ {stop, {connection_died, Reason}, State};
+
+handle_info({Tag, Sock, Data},
+ State = #state{ socket = Sock, connection_state = blocked })
+ when Tag =:= tcp; Tag =:= ssl ->
+ {noreply, State#state{ deferred_recv = Data }, hibernate};
+
+handle_info({Tag, Sock, Data},
+ State = #state{ socket = Sock, connection_state = running })
+ when Tag =:= tcp; Tag =:= ssl ->
+ process_received_bytes(
+ Data, control_throttle(State #state{ await_recv = false }));
+
+handle_info({Tag, Sock}, State = #state{socket = Sock})
+ when Tag =:= tcp_closed; Tag =:= ssl_closed ->
+ network_error(closed, State);
+
+handle_info({Tag, Sock, Reason}, State = #state{socket = Sock})
+ when Tag =:= tcp_error; Tag =:= ssl_error ->
+ network_error(Reason, State);
+
+handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) ->
+ {noreply, State, hibernate};
+
+handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) ->
+ network_error(Reason, State);
+
+handle_info({conserve_resources, Conserve}, State) ->
+ maybe_process_deferred_recv(
+ control_throttle(State #state{ conserve = Conserve }));
+
+handle_info({bump_credit, Msg}, State) ->
+ credit_flow:handle_bump_msg(Msg),
+ maybe_process_deferred_recv(control_throttle(State));
+
+handle_info({start_keepalives, Keepalive},
+ State = #state { keepalive_sup = KeepaliveSup, socket = Sock }) ->
+ %% Only the client has the responsibility for sending keepalives
+ SendFun = fun() -> ok end,
+ Parent = self(),
+ ReceiveFun = fun() -> Parent ! keepalive_timeout end,
+ Heartbeater = rabbit_heartbeat:start(
+ KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun),
+ {noreply, State #state { keepalive = Heartbeater }};
+
+handle_info(keepalive_timeout, State = #state {conn_name = ConnStr,
+ proc_state = PState}) ->
+ rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)~n", [ConnStr]),
+ send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State);
+
+handle_info(emit_stats, State) ->
+ {noreply, emit_stats(State), hibernate};
+
+handle_info({ra_event, _From, Evt},
+ #state{proc_state = PState} = State) ->
+ %% handle applied event to ensure registration command actually got applied
+ %% handle not_leader notification in case we send the command to a non-leader
+ PState1 = rabbit_mqtt_processor:handle_ra_event(Evt, PState),
+ {noreply, State#state{proc_state = PState1}, hibernate};
+
+handle_info(Msg, State) ->
+ {stop, {mqtt_unexpected_msg, Msg}, State}.
+
+terminate(Reason, State) ->
+ maybe_emit_stats(State),
+ do_terminate(Reason, State).
+
+handle_pre_hibernate(State) ->
+ rabbit_mqtt_processor:handle_pre_hibernate(),
+ {hibernate, State}.
+
+do_terminate({network_error, {ssl_upgrade_error, closed}, ConnStr}, _State) ->
+ rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: connection closed~n",
+ [ConnStr]);
+
+do_terminate({network_error,
+ {ssl_upgrade_error,
+ {tls_alert, "handshake failure"}}, ConnStr}, _State) ->
+ log_tls_alert(handshake_failure, ConnStr);
+do_terminate({network_error,
+ {ssl_upgrade_error,
+ {tls_alert, "unknown ca"}}, ConnStr}, _State) ->
+ log_tls_alert(unknown_ca, ConnStr);
+do_terminate({network_error,
+ {ssl_upgrade_error,
+ {tls_alert, {Err, _}}}, ConnStr}, _State) ->
+ log_tls_alert(Err, ConnStr);
+do_terminate({network_error,
+ {ssl_upgrade_error,
+ {tls_alert, Alert}}, ConnStr}, _State) ->
+ log_tls_alert(Alert, ConnStr);
+do_terminate({network_error, {ssl_upgrade_error, Reason}, ConnStr}, _State) ->
+ rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: ~p~n",
+ [ConnStr, Reason]);
+
+do_terminate({network_error, Reason, ConnStr}, _State) ->
+ rabbit_log_connection:error("MQTT detected network error on ~s: ~p~n",
+ [ConnStr, Reason]);
+
+do_terminate({network_error, Reason}, _State) ->
+ rabbit_log_connection:error("MQTT detected network error: ~p~n", [Reason]);
+
+do_terminate(normal, #state{proc_state = ProcState,
+ conn_name = ConnName}) ->
+ rabbit_mqtt_processor:close_connection(ProcState),
+ rabbit_log_connection:info("closing MQTT connection ~p (~s)~n", [self(), ConnName]),
+ ok;
+
+do_terminate(_Reason, #state{proc_state = ProcState}) ->
+ rabbit_mqtt_processor:close_connection(ProcState),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+ssl_login_name(Sock) ->
+ case rabbit_net:peercert(Sock) of
+ {ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of
+ unsafe -> none;
+ not_found -> none;
+ Name -> Name
+ end;
+ {error, no_peercert} -> none;
+ nossl -> none
+ end.
+
+%%----------------------------------------------------------------------------
+
+log_tls_alert(handshake_failure, ConnStr) ->
+ rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: handshake failure~n",
+ [ConnStr]);
+log_tls_alert(unknown_ca, ConnStr) ->
+ rabbit_log_connection:error("MQTT detected TLS certificate verification error on ~s: alert 'unknown CA'~n",
+ [ConnStr]);
+log_tls_alert(Alert, ConnStr) ->
+ rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: alert ~s~n",
+ [ConnStr, Alert]).
+
+log_new_connection(#state{conn_name = ConnStr, proc_state = PState}) ->
+ rabbit_log_connection:info("accepting MQTT connection ~p (~s, client id: ~s)~n",
+ [self(), ConnStr, rabbit_mqtt_processor:info(client_id, PState)]).
+
+process_received_bytes(<<>>, State = #state{proc_state = ProcState,
+ received_connect_frame = false}) ->
+ MqttConn = ProcState#proc_state.connection,
+ case MqttConn of
+ undefined -> ok;
+ _ -> log_new_connection(State)
+ end,
+ {noreply, ensure_stats_timer(State#state{ received_connect_frame = true }), hibernate};
+process_received_bytes(<<>>, State) ->
+ {noreply, ensure_stats_timer(State), hibernate};
+process_received_bytes(Bytes,
+ State = #state{ parse_state = ParseState,
+ proc_state = ProcState,
+ conn_name = ConnStr }) ->
+ case parse(Bytes, ParseState) of
+ {more, ParseState1} ->
+ {noreply,
+ ensure_stats_timer( State #state{ parse_state = ParseState1 }),
+ hibernate};
+ {ok, Frame, Rest} ->
+ case rabbit_mqtt_processor:process_frame(Frame, ProcState) of
+ {ok, ProcState1, ConnPid} ->
+ PS = rabbit_mqtt_frame:initial_state(),
+ process_received_bytes(
+ Rest,
+ State #state{ parse_state = PS,
+ proc_state = ProcState1,
+ connection = ConnPid });
+ %% PUBLISH and more
+ {error, unauthorized = Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT connection ~s is closing due to an authorization failure~n", [ConnStr]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ %% CONNECT frames only
+ {error, unauthenticated = Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT connection ~s is closing due to an authentication failure~n", [ConnStr]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ %% CONNECT frames only
+ {error, invalid_client_id = Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT cannot accept connection ~s: client uses an invalid ID~n", [ConnStr]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ %% CONNECT frames only
+ {error, unsupported_protocol_version = Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT cannot accept connection ~s: incompatible protocol version~n", [ConnStr]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ {error, unavailable = Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT cannot accept connection ~s due to an internal error or unavailable component~n",
+ [ConnStr]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ {error, Reason, ProcState1} ->
+ rabbit_log_connection:error("MQTT protocol error on connection ~s: ~p~n",
+ [ConnStr, Reason]),
+ {stop, {shutdown, Reason}, pstate(State, ProcState1)};
+ {error, Error} ->
+ rabbit_log_connection:error("MQTT detected a framing error on connection ~s: ~p~n",
+ [ConnStr, Error]),
+ {stop, {shutdown, Error}, State};
+ {stop, ProcState1} ->
+ {stop, normal, pstate(State, ProcState1)}
+ end;
+ {error, {cannot_parse, Error, Stacktrace}} ->
+ rabbit_log_connection:error("MQTT cannot parse a frame on connection '~s', unparseable payload: ~p, error: {~p, ~p} ~n",
+ [ConnStr, Bytes, Error, Stacktrace]),
+ {stop, {shutdown, Error}, State};
+ {error, Error} ->
+ rabbit_log_connection:error("MQTT detected a framing error on connection ~s: ~p~n",
+ [ConnStr, Error]),
+ {stop, {shutdown, Error}, State}
+ end.
+
+callback_reply(State, {ok, ProcState}) ->
+ {noreply, pstate(State, ProcState), hibernate};
+callback_reply(State, {error, Reason, ProcState}) ->
+ {stop, Reason, pstate(State, ProcState)}.
+
+start_keepalive(_, 0 ) -> ok;
+start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalives, Keepalive}.
+
+pstate(State = #state {}, PState = #proc_state{}) ->
+ State #state{ proc_state = PState }.
+
+%%----------------------------------------------------------------------------
+parse(Bytes, ParseState) ->
+ try
+ rabbit_mqtt_frame:parse(Bytes, ParseState)
+ catch
+ _:Reason:Stacktrace ->
+ {error, {cannot_parse, Reason, Stacktrace}}
+ end.
+
+send_will_and_terminate(PState, State) ->
+ send_will_and_terminate(PState, {shutdown, conn_closed}, State).
+
+send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) ->
+ rabbit_mqtt_processor:send_will(PState),
+ rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]),
+ % todo: flush channel after publish
+ {stop, Reason, State}.
+
+network_error(closed,
+ State = #state{conn_name = ConnStr,
+ proc_state = PState}) ->
+ MqttConn = PState#proc_state.connection,
+ Fmt = "MQTT connection ~p will terminate because peer closed TCP connection~n",
+ Args = [ConnStr],
+ case MqttConn of
+ undefined -> rabbit_log_connection:debug(Fmt, Args);
+ _ -> rabbit_log_connection:info(Fmt, Args)
+ end,
+ send_will_and_terminate(PState, State);
+
+network_error(Reason,
+ State = #state{conn_name = ConnStr,
+ proc_state = PState}) ->
+ rabbit_log_connection:info("MQTT detected network error for ~p: ~p~n",
+ [ConnStr, Reason]),
+ send_will_and_terminate(PState, State).
+
+run_socket(State = #state{ connection_state = blocked }) ->
+ State;
+run_socket(State = #state{ deferred_recv = Data }) when Data =/= undefined ->
+ State;
+run_socket(State = #state{ await_recv = true }) ->
+ State;
+run_socket(State = #state{ socket = Sock }) ->
+ rabbit_net:setopts(Sock, [{active, once}]),
+ State#state{ await_recv = true }.
+
+control_throttle(State = #state{ connection_state = Flow,
+ conserve = Conserve }) ->
+ case {Flow, Conserve orelse credit_flow:blocked()} of
+ {running, true} -> ok = rabbit_heartbeat:pause_monitor(
+ State#state.keepalive),
+ State #state{ connection_state = blocked };
+ {blocked, false} -> ok = rabbit_heartbeat:resume_monitor(
+ State#state.keepalive),
+ run_socket(State #state{
+ connection_state = running });
+ {_, _} -> run_socket(State)
+ end.
+
+maybe_process_deferred_recv(State = #state{ deferred_recv = undefined }) ->
+ {noreply, State, hibernate};
+maybe_process_deferred_recv(State = #state{ deferred_recv = Data, socket = Sock }) ->
+ handle_info({tcp, Sock, Data},
+ State#state{ deferred_recv = undefined }).
+
+maybe_emit_stats(undefined) ->
+ ok;
+maybe_emit_stats(State) ->
+ rabbit_event:if_enabled(State, #state.stats_timer,
+ fun() -> emit_stats(State) end).
+
+emit_stats(State=#state{connection = C}) when C == none; C == undefined ->
+ %% Avoid emitting stats on terminate when the connection has not yet been
+ %% established, as this causes orphan entries on the stats database
+ State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
+ ensure_stats_timer(State1);
+emit_stats(State) ->
+ [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I
+ = infos(?SIMPLE_METRICS, State),
+ Infos = infos(?OTHER_METRICS, State),
+ rabbit_core_metrics:connection_stats(Pid, Infos),
+ rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions),
+ rabbit_event:notify(connection_stats, Infos ++ I),
+ State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer),
+ ensure_stats_timer(State1).
+
+ensure_stats_timer(State = #state{}) ->
+ rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats).
+
+infos(Items, State) -> [{Item, info_internal(Item, State)} || Item <- Items].
+
+info_internal(pid, State) -> info_internal(connection, State);
+info_internal(SockStat, #state{socket = Sock}) when SockStat =:= recv_oct;
+ SockStat =:= recv_cnt;
+ SockStat =:= send_oct;
+ SockStat =:= send_cnt;
+ SockStat =:= send_pend ->
+ case rabbit_net:getstat(Sock, [SockStat]) of
+ {ok, [{_, N}]} when is_number(N) -> N;
+ _ -> 0
+ end;
+info_internal(state, State) -> info_internal(connection_state, State);
+info_internal(garbage_collection, _State) ->
+ rabbit_misc:get_gc_info(self());
+info_internal(reductions, _State) ->
+ {reductions, Reductions} = erlang:process_info(self(), reductions),
+ Reductions;
+info_internal(conn_name, #state{conn_name = Val}) ->
+ rabbit_data_coercion:to_binary(Val);
+info_internal(connection_state, #state{received_connect_frame = false}) ->
+ starting;
+info_internal(connection_state, #state{connection_state = Val}) ->
+ Val;
+info_internal(connection, #state{connection = Val}) ->
+ Val;
+info_internal(Key, #state{proc_state = ProcState}) ->
+ rabbit_mqtt_processor:info(Key, ProcState).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl
new file mode 100644
index 0000000000..4b3ee95743
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl
@@ -0,0 +1,23 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retained_msg_store).
+
+-export([behaviour_info/1, table_name_for/1]).
+
+behaviour_info(callbacks) ->
+ [{new, 2},
+ {recover, 2},
+ {insert, 3},
+ {lookup, 2},
+ {delete, 2},
+ {terminate, 1}];
+behaviour_info(_Other) ->
+ undefined.
+
+table_name_for(VHost) ->
+ rabbit_mqtt_util:vhost_name_to_table_name(VHost).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl
new file mode 100644
index 0000000000..03c5942d35
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl
@@ -0,0 +1,54 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retained_msg_store_dets).
+
+-behaviour(rabbit_mqtt_retained_msg_store).
+-include("rabbit_mqtt.hrl").
+
+-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
+
+-record(store_state, {
+ %% DETS table name
+ table
+}).
+
+
+new(Dir, VHost) ->
+ Tid = open_table(Dir, VHost),
+ #store_state{table = Tid}.
+
+recover(Dir, VHost) ->
+ case open_table(Dir, VHost) of
+ {error, _} -> {error, uninitialized};
+ {ok, Tid} -> {ok, #store_state{table = Tid}}
+ end.
+
+insert(Topic, Msg, #store_state{table = T}) ->
+ ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}).
+
+lookup(Topic, #store_state{table = T}) ->
+ case dets:lookup(T, Topic) of
+ [] -> not_found;
+ [Entry] -> Entry
+ end.
+
+delete(Topic, #store_state{table = T}) ->
+ ok = dets:delete(T, Topic).
+
+terminate(#store_state{table = T}) ->
+ ok = dets:close(T).
+
+open_table(Dir, VHost) ->
+ dets:open_file(rabbit_mqtt_retained_msg_store:table_name_for(VHost),
+ table_options(rabbit_mqtt_util:path_for(Dir, VHost, ".dets"))).
+
+table_options(Path) ->
+ [{type, set}, {keypos, #retained_message.topic},
+ {file, Path}, {ram_file, true}, {repair, true},
+ {auto_save, rabbit_misc:get_env(rabbit_mqtt,
+ retained_message_store_dets_sync_interval, 2000)}].
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl
new file mode 100644
index 0000000000..9080a6f4cf
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl
@@ -0,0 +1,54 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retained_msg_store_ets).
+
+-behaviour(rabbit_mqtt_retained_msg_store).
+-include("rabbit_mqtt.hrl").
+
+-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
+
+-record(store_state, {
+ %% ETS table ID
+ table,
+ %% where the table is stored on disk
+ filename
+}).
+
+
+new(Dir, VHost) ->
+ Path = rabbit_mqtt_util:path_for(Dir, VHost),
+ TableName = rabbit_mqtt_retained_msg_store:table_name_for(VHost),
+ file:delete(Path),
+ Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]),
+ #store_state{table = Tid, filename = Path}.
+
+recover(Dir, VHost) ->
+ Path = rabbit_mqtt_util:path_for(Dir, VHost),
+ case ets:file2tab(Path) of
+ {ok, Tid} -> file:delete(Path),
+ {ok, #store_state{table = Tid, filename = Path}};
+ {error, _} -> {error, uninitialized}
+ end.
+
+insert(Topic, Msg, #store_state{table = T}) ->
+ true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}),
+ ok.
+
+lookup(Topic, #store_state{table = T}) ->
+ case ets:lookup(T, Topic) of
+ [] -> not_found;
+ [Entry] -> Entry
+ end.
+
+delete(Topic, #store_state{table = T}) ->
+ true = ets:delete(T, Topic),
+ ok.
+
+terminate(#store_state{table = T, filename = Path}) ->
+ ok = ets:tab2file(T, Path,
+ [{extended_info, [object_count]}]).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl
new file mode 100644
index 0000000000..382ffbc63d
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl
@@ -0,0 +1,31 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retained_msg_store_noop).
+
+-behaviour(rabbit_mqtt_retained_msg_store).
+-include("rabbit_mqtt.hrl").
+
+-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]).
+
+new(_Dir, _VHost) ->
+ ok.
+
+recover(_Dir, _VHost) ->
+ {ok, ok}.
+
+insert(_Topic, _Msg, _State) ->
+ ok.
+
+lookup(_Topic, _State) ->
+ not_found.
+
+delete(_Topic, _State) ->
+ ok.
+
+terminate(_State) ->
+ ok.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl
new file mode 100644
index 0000000000..2aa873ecfb
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl
@@ -0,0 +1,98 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retainer).
+
+-behaviour(gen_server2).
+-include("rabbit_mqtt.hrl").
+-include("rabbit_mqtt_frame.hrl").
+
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3, start_link/2]).
+
+-export([retain/3, fetch/2, clear/2, store_module/0]).
+
+-define(SERVER, ?MODULE).
+-define(TIMEOUT, 30000).
+
+-record(retainer_state, {store_mod,
+ store}).
+
+-spec retain(pid(), string(), mqtt_msg()) ->
+ {noreply, NewState :: term()} |
+ {noreply, NewState :: term(), timeout() | hibernate} |
+ {stop, Reason :: term(), NewState :: term()}.
+
+%%----------------------------------------------------------------------------
+
+start_link(RetainStoreMod, VHost) ->
+ gen_server2:start_link(?MODULE, [RetainStoreMod, VHost], []).
+
+retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) ->
+ gen_server2:cast(Pid, {retain, Topic, Msg});
+
+retain(_Pid, _Topic, Msg = #mqtt_msg{retain = false}) ->
+ throw({error, {retain_is_false, Msg}}).
+
+fetch(Pid, Topic) ->
+ gen_server2:call(Pid, {fetch, Topic}, ?TIMEOUT).
+
+clear(Pid, Topic) ->
+ gen_server2:cast(Pid, {clear, Topic}).
+
+%%----------------------------------------------------------------------------
+
+init([StoreMod, VHost]) ->
+ process_flag(trap_exit, true),
+ State = case StoreMod:recover(store_dir(), VHost) of
+ {ok, Store} -> #retainer_state{store = Store,
+ store_mod = StoreMod};
+ {error, _} -> #retainer_state{store = StoreMod:new(store_dir(), VHost),
+ store_mod = StoreMod}
+ end,
+ {ok, State}.
+
+store_module() ->
+ case application:get_env(rabbitmq_mqtt, retained_message_store) of
+ {ok, Mod} -> Mod;
+ undefined -> undefined
+ end.
+
+%%----------------------------------------------------------------------------
+
+handle_cast({retain, Topic, Msg},
+ State = #retainer_state{store = Store, store_mod = Mod}) ->
+ ok = Mod:insert(Topic, Msg, Store),
+ {noreply, State};
+handle_cast({clear, Topic},
+ State = #retainer_state{store = Store, store_mod = Mod}) ->
+ ok = Mod:delete(Topic, Store),
+ {noreply, State}.
+
+handle_call({fetch, Topic}, _From,
+ State = #retainer_state{store = Store, store_mod = Mod}) ->
+ Reply = case Mod:lookup(Topic, Store) of
+ #retained_message{mqtt_msg = Msg} -> Msg;
+ not_found -> undefined
+ end,
+ {reply, Reply, State}.
+
+handle_info(stop, State) ->
+ {stop, normal, State};
+
+handle_info(Info, State) ->
+ {stop, {unknown_info, Info}, State}.
+
+store_dir() ->
+ rabbit_mnesia:dir().
+
+terminate(_Reason, #retainer_state{store = Store, store_mod = Mod}) ->
+ Mod:terminate(Store),
+ ok.
+
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl
new file mode 100644
index 0000000000..86b54ce3d7
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl
@@ -0,0 +1,60 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_retainer_sup).
+-behaviour(supervisor2).
+
+-export([start_link/1, init/1, start_child/2,start_child/1, child_for_vhost/1,
+ delete_child/1]).
+
+-define(ENCODING, utf8).
+
+-spec start_child(binary()) -> supervisor2:startchild_ret().
+-spec start_child(term(), binary()) -> supervisor2:startchild_ret().
+
+start_link(SupName) ->
+ supervisor2:start_link(SupName, ?MODULE, []).
+
+child_for_vhost(VHost) when is_binary(VHost) ->
+ case rabbit_mqtt_retainer_sup:start_child(VHost) of
+ {ok, Pid} -> Pid;
+ {error, {already_started, Pid}} -> Pid
+ end.
+
+start_child(VHost) when is_binary(VHost) ->
+ start_child(rabbit_mqtt_retainer:store_module(), VHost).
+
+start_child(RetainStoreMod, VHost) ->
+ supervisor2:start_child(?MODULE,
+
+ {vhost_to_atom(VHost),
+ {rabbit_mqtt_retainer, start_link, [RetainStoreMod, VHost]},
+ permanent, 60, worker, [rabbit_mqtt_retainer]}).
+
+delete_child(VHost) ->
+ Id = vhost_to_atom(VHost),
+ ok = supervisor2:terminate_child(?MODULE, Id),
+ ok = supervisor2:delete_child(?MODULE, Id).
+
+init([]) ->
+ Mod = rabbit_mqtt_retainer:store_module(),
+ rabbit_log:info("MQTT retained message store: ~p~n",
+ [Mod]),
+ {ok, {{one_for_one, 5, 5}, child_specs(Mod, rabbit_vhost:list_names())}}.
+
+child_specs(Mod, VHosts) ->
+ %% see start_child/2
+ [{vhost_to_atom(V),
+ {rabbit_mqtt_retainer, start_link, [Mod, V]},
+ permanent, infinity, worker, [rabbit_mqtt_retainer]} || V <- VHosts].
+
+vhost_to_atom(VHost) ->
+ %% we'd like to avoid any conversion here because
+ %% this atom isn't meant to be human-readable, only
+ %% unique. This makes sure we don't get noisy process restarts
+ %% with really unusual vhost names used by various HTTP API test suites
+ rabbit_data_coercion:to_atom(VHost, latin1).
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
new file mode 100644
index 0000000000..c00be457d3
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl
@@ -0,0 +1,73 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_sup).
+-behaviour(supervisor2).
+
+-include_lib("rabbit_common/include/rabbit.hrl").
+
+-export([start_link/2, init/1, stop_listeners/0]).
+
+-define(TCP_PROTOCOL, 'mqtt').
+-define(TLS_PROTOCOL, 'mqtt/ssl').
+
+start_link(Listeners, []) ->
+ supervisor2:start_link({local, ?MODULE}, ?MODULE, [Listeners]).
+
+init([{Listeners, SslListeners0}]) ->
+ NumTcpAcceptors = application:get_env(rabbitmq_mqtt, num_tcp_acceptors, 10),
+ {ok, SocketOpts} = application:get_env(rabbitmq_mqtt, tcp_listen_options),
+ {SslOpts, NumSslAcceptors, SslListeners}
+ = case SslListeners0 of
+ [] -> {none, 0, []};
+ _ -> {rabbit_networking:ensure_ssl(),
+ application:get_env(rabbitmq_mqtt, num_ssl_acceptors, 10),
+ case rabbit_networking:poodle_check('MQTT') of
+ ok -> SslListeners0;
+ danger -> []
+ end}
+ end,
+ {ok, {{one_for_all, 10, 10},
+ [{rabbit_mqtt_retainer_sup,
+ {rabbit_mqtt_retainer_sup, start_link, [{local, rabbit_mqtt_retainer_sup}]},
+ transient, ?SUPERVISOR_WAIT, supervisor, [rabbit_mqtt_retainer_sup]} |
+ listener_specs(fun tcp_listener_spec/1,
+ [SocketOpts, NumTcpAcceptors], Listeners) ++
+ listener_specs(fun ssl_listener_spec/1,
+ [SocketOpts, SslOpts, NumSslAcceptors], SslListeners)]}}.
+
+stop_listeners() ->
+ rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL),
+ rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL),
+ ok.
+
+%%
+%% Implementation
+%%
+
+listener_specs(Fun, Args, Listeners) ->
+ [Fun([Address | Args]) ||
+ Listener <- Listeners,
+ Address <- rabbit_networking:tcp_listener_addresses(Listener)].
+
+tcp_listener_spec([Address, SocketOpts, NumAcceptors]) ->
+ rabbit_networking:tcp_listener_spec(
+ rabbit_mqtt_listener_sup, Address, SocketOpts,
+ transport(?TCP_PROTOCOL), rabbit_mqtt_connection_sup, [],
+ mqtt, NumAcceptors, "MQTT TCP listener").
+
+ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors]) ->
+ rabbit_networking:tcp_listener_spec(
+ rabbit_mqtt_listener_sup, Address, SocketOpts ++ SslOpts,
+ transport(?TLS_PROTOCOL), rabbit_mqtt_connection_sup, [],
+ 'mqtt/ssl', NumAcceptors, "MQTT TLS listener").
+
+transport(Protocol) ->
+ case Protocol of
+ ?TCP_PROTOCOL -> ranch_tcp;
+ ?TLS_PROTOCOL -> ranch_ssl
+ end.
diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
new file mode 100644
index 0000000000..0fbe7e8a85
--- /dev/null
+++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl
@@ -0,0 +1,139 @@
+%% This Source Code Form is subject to the terms of the Mozilla Public
+%% License, v. 2.0. If a copy of the MPL was not distributed with this
+%% file, You can obtain one at https://mozilla.org/MPL/2.0/.
+%%
+%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved.
+%%
+
+-module(rabbit_mqtt_util).
+
+-include("rabbit_mqtt.hrl").
+
+-export([subcription_queue_name/1,
+ gen_client_id/0,
+ env/1,
+ table_lookup/2,
+ path_for/2,
+ path_for/3,
+ vhost_name_to_table_name/1,
+ get_topic_translation_funs/0
+ ]).
+
+-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12).
+
+subcription_queue_name(ClientId) ->
+ Base = "mqtt-subscription-" ++ ClientId ++ "qos",
+ {list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}.
+
+cached(CacheName, Fun, Arg) ->
+ Cache =
+ case get(CacheName) of
+ undefined ->
+ [];
+ Other ->
+ Other
+ end,
+ case lists:keyfind(Arg, 1, Cache) of
+ {_, V} ->
+ V;
+ false ->
+ V = Fun(Arg),
+ CacheTail = lists:sublist(Cache, ?MAX_TOPIC_TRANSLATION_CACHE_SIZE - 1),
+ put(CacheName, [{Arg, V} | CacheTail]),
+ V
+ end.
+
+to_amqp(T0) ->
+ T1 = string:replace(T0, "/", ".", all),
+ T2 = string:replace(T1, "+", "*", all),
+ erlang:iolist_to_binary(T2).
+
+to_mqtt(T0) ->
+ T1 = string:replace(T0, "*", "+", all),
+ T2 = string:replace(T1, ".", "/", all),
+ erlang:iolist_to_binary(T2).
+
+%% amqp mqtt descr
+%% * + match one topic level
+%% # # match multiple topic levels
+%% . / topic level separator
+get_topic_translation_funs() ->
+ SparkplugB = env(sparkplug),
+ ToAmqpFun = fun(Topic) ->
+ cached(mta_cache, fun to_amqp/1, Topic)
+ end,
+ ToMqttFun = fun(Topic) ->
+ cached(atm_cache, fun to_mqtt/1, Topic)
+ end,
+ {M2AFun, A2MFun} = case SparkplugB of
+ true ->
+ {ok, M2A_SpRe} = re:compile("^sp[AB]v\\d+\\.\\d+/"),
+ {ok, A2M_SpRe} = re:compile("^sp[AB]v\\d+___\\d+\\."),
+ M2A = fun(T0) ->
+ case re:run(T0, M2A_SpRe) of
+ nomatch ->
+ ToAmqpFun(T0);
+ {match, _} ->
+ T1 = string:replace(T0, ".", "___", leading),
+ ToAmqpFun(T1)
+ end
+ end,
+ A2M = fun(T0) ->
+ case re:run(T0, A2M_SpRe) of
+ nomatch ->
+ ToMqttFun(T0);
+ {match, _} ->
+ T1 = ToMqttFun(T0),
+ T2 = string:replace(T1, "___", ".", leading),
+ erlang:iolist_to_binary(T2)
+ end
+ end,
+ {M2A, A2M};
+ _ ->
+ M2A = fun(T) ->
+ ToAmqpFun(T)
+ end,
+ A2M = fun(T) ->
+ ToMqttFun(T)
+ end,
+ {M2A, A2M}
+ end,
+ {ok, {mqtt2amqp_fun, M2AFun}, {amqp2mqtt_fun, A2MFun}}.
+
+gen_client_id() ->
+ lists:nthtail(1, rabbit_guid:string(rabbit_guid:gen_secure(), [])).
+
+env(Key) ->
+ case application:get_env(rabbitmq_mqtt, Key) of
+ {ok, Val} -> coerce_env_value(Key, Val);
+ undefined -> undefined
+ end.
+
+%% TODO: move to rabbit_common
+coerce_env_value(default_pass, Val) -> rabbit_data_coercion:to_binary(Val);
+coerce_env_value(default_user, Val) -> rabbit_data_coercion:to_binary(Val);
+coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val);
+coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val);
+coerce_env_value(_, Val) -> Val.
+
+table_lookup(undefined, _Key) ->
+ undefined;
+table_lookup(Table, Key) ->
+ rabbit_misc:table_lookup(Table, Key).
+
+vhost_name_to_dir_name(VHost) ->
+ vhost_name_to_dir_name(VHost, ".ets").
+vhost_name_to_dir_name(VHost, Suffix) ->
+ <<Num:128>> = erlang:md5(VHost),
+ "mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num]) ++ Suffix.
+
+path_for(Dir, VHost) ->
+ filename:join(Dir, vhost_name_to_dir_name(VHost)).
+
+path_for(Dir, VHost, Suffix) ->
+ filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)).
+
+
+vhost_name_to_table_name(VHost) ->
+ <<Num:128>> = erlang:md5(VHost),
+ list_to_atom("rabbit_mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num])).