diff options
Diffstat (limited to 'deps/rabbitmq_mqtt/src')
20 files changed, 2967 insertions, 0 deletions
diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl new file mode 100644 index 0000000000..f0aefb526b --- /dev/null +++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand.erl @@ -0,0 +1,68 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.DecommissionMqttNodeCommand'). + +-include("rabbit_mqtt.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([scopes/0, + switches/0, + aliases/0, + usage/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +scopes() -> [ctl]. +switches() -> []. +aliases() -> []. + +description() -> <<"Removes cluster member and permanently deletes its cluster-wide MQTT state">>. + +help_section() -> + {plugin, mqtt}. + +validate([], _Opts) -> + {validation_failure, not_enough_args}; +validate([_, _ | _], _Opts) -> + {validation_failure, too_many_args}; +validate([_], _) -> + ok. + +merge_defaults(Args, Opts) -> + {Args, Opts}. + +usage() -> + <<"decommission_mqtt_node <node>">>. + +usage_doc_guides() -> + [?MQTT_GUIDE_URL]. + +run([Node], #{node := NodeName, + timeout := Timeout}) -> + case rabbit_misc:rpc_call(NodeName, rabbit_mqtt_collector, leave, [Node], Timeout) of + {badrpc, _} = Error -> + Error; + nodedown -> + {ok, list_to_binary(io_lib:format("Node ~s is down but has been successfully removed" + " from the cluster", [Node]))}; + Result -> + %% 'ok' or 'timeout' + %% TODO: Ra will timeout if the node is not a cluster member - should this be fixed?? + Result + end. + +banner([Node], _) -> list_to_binary(io_lib:format("Removing node ~s from the list of MQTT nodes...", [Node])). + +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl new file mode 100644 index 0000000000..a5745a7f58 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand.erl @@ -0,0 +1,87 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. + +-module('Elixir.RabbitMQ.CLI.Ctl.Commands.ListMqttConnectionsCommand'). + +-include("rabbit_mqtt.hrl"). + +-behaviour('Elixir.RabbitMQ.CLI.CommandBehaviour'). + +-export([formatter/0, + scopes/0, + switches/0, + aliases/0, + usage/0, + usage_additional/0, + usage_doc_guides/0, + banner/2, + validate/2, + merge_defaults/2, + run/2, + output/2, + description/0, + help_section/0]). + +formatter() -> 'Elixir.RabbitMQ.CLI.Formatters.Table'. +scopes() -> [ctl, diagnostics]. +switches() -> [{verbose, boolean}]. +aliases() -> [{'V', verbose}]. + +description() -> <<"Lists MQTT connections on the target node">>. + +help_section() -> + {plugin, mqtt}. + +validate(Args, _) -> + case 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':validate_info_keys(Args, + ?INFO_ITEMS) of + {ok, _} -> ok; + Error -> Error + end. + +merge_defaults([], Opts) -> + merge_defaults([<<"client_id">>, <<"conn_name">>], Opts); +merge_defaults(Args, Opts) -> + {Args, maps:merge(#{verbose => false}, Opts)}. + +usage() -> + <<"list_mqtt_connections [<column> ...]">>. + +usage_additional() -> + Prefix = <<" must be one of ">>, + InfoItems = 'Elixir.Enum':join(lists:usort(?INFO_ITEMS), <<", ">>), + [ + {<<"<column>">>, <<Prefix/binary, InfoItems/binary>>} + ]. + +usage_doc_guides() -> + [?MQTT_GUIDE_URL]. + +run(Args, #{node := NodeName, + timeout := Timeout, + verbose := Verbose}) -> + InfoKeys = case Verbose of + true -> ?INFO_ITEMS; + false -> 'Elixir.RabbitMQ.CLI.Ctl.InfoKeys':prepare_info_keys(Args) + end, + + %% a node uses the Raft-based collector to list connections, which knows about all connections in the cluster + %% so no need to reach out to all the nodes + Nodes = [NodeName], + + 'Elixir.RabbitMQ.CLI.Ctl.RpcStream':receive_list_items( + NodeName, + rabbit_mqtt, + emit_connection_info_all, + [Nodes, InfoKeys], + Timeout, + InfoKeys, + length(Nodes)). + +banner(_, _) -> <<"Listing MQTT connections ...">>. + +output(Result, _Opts) -> + 'Elixir.RabbitMQ.CLI.DefaultOutput':output(Result). diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl new file mode 100644 index 0000000000..334aa9e32c --- /dev/null +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -0,0 +1,134 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(mqtt_machine). +-behaviour(ra_machine). + +-include("mqtt_machine.hrl"). + +-export([init/1, + apply/3, + state_enter/2, + notify_connection/2]). + +-type state() :: #machine_state{}. + +-type config() :: map(). + +-type reply() :: {ok, term()} | {error, term()}. +-type client_id() :: term(). + +-type command() :: {register, client_id(), pid()} | + {unregister, client_id(), pid()} | + list. + +-spec init(config()) -> state(). +init(_Conf) -> + #machine_state{}. + +-spec apply(map(), command(), state()) -> + {state(), reply(), ra_machine:effects()}. +apply(_Meta, {register, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) -> + {Effects, Ids1} = + case maps:find(ClientId, Ids) of + {ok, OldPid} when Pid =/= OldPid -> + Effects0 = [{demonitor, process, OldPid}, + {monitor, process, Pid}, + {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}], + {Effects0, maps:remove(ClientId, Ids)}; + _ -> + Effects0 = [{monitor, process, Pid}], + {Effects0, Ids} + end, + State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1)}, + {State, ok, Effects}; + +apply(Meta, {unregister, ClientId, Pid}, #machine_state{client_ids = Ids} = State0) -> + State = case maps:find(ClientId, Ids) of + {ok, Pid} -> State0#machine_state{client_ids = maps:remove(ClientId, Ids)}; + %% don't delete client id that might belong to a newer connection + %% that kicked the one with Pid out + {ok, _AnotherPid} -> State0; + error -> State0 + end, + Effects0 = [{demonitor, process, Pid}], + %% snapshot only when the map has changed + Effects = case State of + State0 -> Effects0; + _ -> Effects0 ++ snapshot_effects(Meta, State) + end, + {State, ok, Effects}; + +apply(_Meta, {down, DownPid, noconnection}, State) -> + %% Monitor the node the pid is on (see {nodeup, Node} below) + %% so that we can detect when the node is re-connected and discover the + %% actual fate of the connection processes on it + Effect = {monitor, node, node(DownPid)}, + {State, ok, Effect}; + +apply(Meta, {down, DownPid, _}, #machine_state{client_ids = Ids} = State0) -> + Ids1 = maps:filter(fun (_ClientId, Pid) when Pid =:= DownPid -> + false; + (_, _) -> + true + end, Ids), + State = State0#machine_state{client_ids = Ids1}, + Delta = maps:keys(Ids) -- maps:keys(Ids1), + Effects = lists:map(fun(Id) -> + [{mod_call, rabbit_log, debug, + ["MQTT connection with client id '~s' failed", [Id]]}] end, Delta), + {State, ok, Effects ++ snapshot_effects(Meta, State)}; + +apply(_Meta, {nodeup, Node}, State) -> + %% Work out if any pids that were disconnected are still + %% alive. + %% Re-request the monitor for the pids on the now-back node. + Effects = [{monitor, process, Pid} || Pid <- all_pids(State), node(Pid) == Node], + {State, ok, Effects}; +apply(_Meta, {nodedown, _Node}, State) -> + {State, ok}; + +apply(Meta, {leave, Node}, #machine_state{client_ids = Ids} = State0) -> + Ids1 = maps:filter(fun (_ClientId, Pid) -> node(Pid) =/= Node end, Ids), + Delta = maps:keys(Ids) -- maps:keys(Ids1), + + Effects = lists:foldl(fun (ClientId, Acc) -> + Pid = maps:get(ClientId, Ids), + [ + {demonitor, process, Pid}, + {mod_call, ?MODULE, notify_connection, [Pid, decommission_node]}, + {mod_call, rabbit_log, debug, + ["MQTT will remove client ID '~s' from known " + "as its node has been decommissioned", [ClientId]]} + ] ++ Acc + end, [], Delta), + + State = State0#machine_state{client_ids = Ids1}, + {State, ok, Effects ++ snapshot_effects(Meta, State)}; + +apply(_Meta, Unknown, State) -> + error_logger:error_msg("MQTT Raft state machine received unknown command ~p~n", [Unknown]), + {State, {error, {unknown_command, Unknown}}, []}. + +state_enter(leader, State) -> + %% re-request monitors for all known pids, this would clean up + %% records for all connections are no longer around, e.g. right after node restart + [{monitor, process, Pid} || Pid <- all_pids(State)]; +state_enter(_, _) -> + []. + +%% ========================== + +%% Avoids blocking the Raft leader. +notify_connection(Pid, Reason) -> + spawn(fun() -> gen_server2:cast(Pid, Reason) end). + +-spec snapshot_effects(map(), state()) -> ra_machine:effects(). +snapshot_effects(#{index := RaftIdx}, State) -> + [{release_cursor, RaftIdx, State}]. + +all_pids(#machine_state{client_ids = Ids}) -> + maps:values(Ids). diff --git a/deps/rabbitmq_mqtt/src/mqtt_node.erl b/deps/rabbitmq_mqtt/src/mqtt_node.erl new file mode 100644 index 0000000000..84dcd9b3a4 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/mqtt_node.erl @@ -0,0 +1,132 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(mqtt_node). + +-export([start/0, node_id/0, server_id/0, all_node_ids/0, leave/1, trigger_election/0]). + +-define(ID_NAME, mqtt_node). +-define(START_TIMEOUT, 100000). +-define(RETRY_INTERVAL, 5000). +-define(RA_OPERATION_TIMEOUT, 60000). + +node_id() -> + server_id(node()). + +server_id() -> + server_id(node()). + +server_id(Node) -> + {?ID_NAME, Node}. + +all_node_ids() -> + [server_id(N) || N <- rabbit_mnesia:cluster_nodes(all), + can_participate_in_clientid_tracking(N)]. + +start() -> + %% 3s to 6s randomized + Repetitions = rand:uniform(10) + 10, + start(300, Repetitions). + +start(_Delay, AttemptsLeft) when AttemptsLeft =< 0 -> + start_server(), + trigger_election(); +start(Delay, AttemptsLeft) -> + NodeId = server_id(), + Nodes = compatible_peer_servers(), + case ra_directory:uid_of(?ID_NAME) of + undefined -> + case Nodes of + [] -> + %% Since cluster members are not known ahead of time and initial boot can be happening in parallel, + %% we wait and check a few times (up to a few seconds) to see if we can discover any peers to + %% join before forming a cluster. This reduces the probability of N independent clusters being + %% formed in the common scenario of N nodes booting in parallel e.g. because they were started + %% at the same time by a deployment tool. + %% + %% This scenario does not guarantee single cluster formation but without knowing the list of members + %% ahead of time, this is a best effort workaround. Multi-node consensus is apparently hard + %% to achieve without having consensus around expected cluster members. + rabbit_log:info("MQTT: will wait for ~p more ms for cluster members to join before triggering a Raft leader election", [Delay]), + timer:sleep(Delay), + start(Delay, AttemptsLeft - 1); + Peers -> + %% Trigger an election. + %% This is required when we start a node for the first time. + %% Using default timeout because it supposed to reply fast. + rabbit_log:info("MQTT: discovered ~p cluster peers that support client ID tracking", [length(Peers)]), + start_server(), + join_peers(NodeId, Peers), + ra:trigger_election(NodeId, ?RA_OPERATION_TIMEOUT) + end; + _ -> + join_peers(NodeId, Nodes), + ra:restart_server(NodeId), + ra:trigger_election(NodeId) + end, + ok. + +compatible_peer_servers() -> + all_node_ids() -- [(node_id())]. + +start_server() -> + NodeId = node_id(), + Nodes = compatible_peer_servers(), + UId = ra:new_uid(ra_lib:to_binary(?ID_NAME)), + Timeout = application:get_env(kernel, net_ticktime, 60) + 5, + Conf = #{cluster_name => ?ID_NAME, + id => NodeId, + uid => UId, + friendly_name => ?ID_NAME, + initial_members => Nodes, + log_init_args => #{uid => UId}, + tick_timeout => Timeout, + machine => {module, mqtt_machine, #{}} + }, + ra:start_server(Conf). + +trigger_election() -> + ra:trigger_election(server_id()). + +join_peers(_NodeId, []) -> + ok; +join_peers(NodeId, Nodes) -> + join_peers(NodeId, Nodes, 100). +join_peers(_NodeId, [], _RetriesLeft) -> + ok; +join_peers(_NodeId, _Nodes, RetriesLeft) when RetriesLeft =:= 0 -> + rabbit_log:error("MQTT: exhausted all attempts while trying to rejoin cluster peers"); +join_peers(NodeId, Nodes, RetriesLeft) -> + case ra:members(Nodes, ?START_TIMEOUT) of + {ok, Members, _} -> + case lists:member(NodeId, Members) of + true -> ok; + false -> ra:add_member(Members, NodeId) + end; + {timeout, _} -> + rabbit_log:debug("MQTT: timed out contacting cluster peers, %s retries left", [RetriesLeft]), + timer:sleep(?RETRY_INTERVAL), + join_peers(NodeId, Nodes, RetriesLeft - 1); + Err -> + Err + end. + +-spec leave(node()) -> 'ok' | 'timeout' | 'nodedown'. +leave(Node) -> + NodeId = server_id(), + ToLeave = server_id(Node), + try + ra:leave_and_delete_server(NodeId, ToLeave) + catch + exit:{{nodedown, Node}, _} -> + nodedown + end. + +can_participate_in_clientid_tracking(Node) -> + case rpc:call(Node, mqtt_machine, module_info, []) of + {badrpc, _} -> false; + _ -> true + end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl new file mode 100644 index 0000000000..192f8a7fee --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt.erl @@ -0,0 +1,55 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt). + +-behaviour(application). +-export([start/2, stop/1]). +-export([connection_info_local/1, + emit_connection_info_local/3, + emit_connection_info_all/4, + close_all_client_connections/1]). + +start(normal, []) -> + {ok, Listeners} = application:get_env(tcp_listeners), + {ok, SslListeners} = application:get_env(ssl_listeners), + ok = mqtt_node:start(), + Result = rabbit_mqtt_sup:start_link({Listeners, SslListeners}, []), + EMPid = case rabbit_event:start_link() of + {ok, Pid} -> Pid; + {error, {already_started, Pid}} -> Pid + end, + gen_event:add_handler(EMPid, rabbit_mqtt_internal_event_handler, []), + Result. + +stop(_) -> + rabbit_mqtt_sup:stop_listeners(). + +-spec close_all_client_connections(string() | binary()) -> {'ok', non_neg_integer()}. +close_all_client_connections(Reason) -> + Connections = rabbit_mqtt_collector:list(), + [rabbit_mqtt_reader:close_connection(Pid, Reason) || {_, Pid} <- Connections], + {ok, length(Connections)}. + +emit_connection_info_all(Nodes, Items, Ref, AggregatorPid) -> + Pids = [spawn_link(Node, rabbit_mqtt, emit_connection_info_local, + [Items, Ref, AggregatorPid]) + || Node <- Nodes], + rabbit_control_misc:await_emitters_termination(Pids), + ok. + +emit_connection_info_local(Items, Ref, AggregatorPid) -> + rabbit_control_misc:emitting_map_with_exit_handler( + AggregatorPid, Ref, fun({_, Pid}) -> + rabbit_mqtt_reader:info(Pid, Items) + end, + rabbit_mqtt_collector:list()). + +connection_info_local(Items) -> + Connections = rabbit_mqtt_collector:list(), + [rabbit_mqtt_reader:info(Pid, Items) + || {_, Pid} <- Connections]. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl new file mode 100644 index 0000000000..341ee46850 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_collector.erl @@ -0,0 +1,88 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_collector). + +-include("mqtt_machine.hrl"). + +-export([register/2, register/3, unregister/2, list/0, leave/1]). + +%%---------------------------------------------------------------------------- +-spec register(term(), pid()) -> {ok, reference()} | {error, term()}. +register(ClientId, Pid) -> + {ClusterName, _} = NodeId = mqtt_node:server_id(), + case ra_leaderboard:lookup_leader(ClusterName) of + undefined -> + case ra:members(NodeId) of + {ok, _, Leader} -> + register(Leader, ClientId, Pid); + _ = Error -> + Error + end; + Leader -> + register(Leader, ClientId, Pid) + end. + +-spec register(ra:server_id(), term(), pid()) -> + {ok, reference()} | {error, term()}. +register(ServerId, ClientId, Pid) -> + Corr = make_ref(), + send_ra_command(ServerId, {register, ClientId, Pid}, Corr), + erlang:send_after(5000, self(), {ra_event, undefined, register_timeout}), + {ok, Corr}. + +unregister(ClientId, Pid) -> + {ClusterName, _} = mqtt_node:server_id(), + case ra_leaderboard:lookup_leader(ClusterName) of + undefined -> + ok; + Leader -> + send_ra_command(Leader, {unregister, ClientId, Pid}, no_correlation) + end. + +list() -> + {ClusterName, _} = mqtt_node:server_id(), + QF = fun (#machine_state{client_ids = Ids}) -> maps:to_list(Ids) end, + case ra_leaderboard:lookup_leader(ClusterName) of + undefined -> + NodeIds = mqtt_node:all_node_ids(), + case ra:leader_query(NodeIds, QF) of + {ok, {_, Ids}, _} -> Ids; + {timeout, _} -> + rabbit_log:debug("~s:list/0 leader query timed out", + [?MODULE]), + [] + end; + Leader -> + case ra:leader_query(Leader, QF) of + {ok, {_, Ids}, _} -> Ids; + {error, _} -> + []; + {timeout, _} -> + rabbit_log:debug("~s:list/0 leader query timed out", + [?MODULE]), + [] + end + end. + +leave(NodeBin) -> + Node = binary_to_atom(NodeBin, utf8), + ServerId = mqtt_node:server_id(), + run_ra_command(ServerId, {leave, Node}), + mqtt_node:leave(Node). + +%%---------------------------------------------------------------------------- +-spec run_ra_command(term(), term()) -> term() | {error, term()}. +run_ra_command(ServerId, RaCommand) -> + case ra:process_command(ServerId, RaCommand) of + {ok, Result, _} -> Result; + _ = Error -> Error + end. + +-spec send_ra_command(term(), term(), term()) -> ok. +send_ra_command(ServerId, RaCommand, Correlation) -> + ok = ra:pipeline_command(ServerId, RaCommand, Correlation, normal). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl new file mode 100644 index 0000000000..4e73a19253 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_info.erl @@ -0,0 +1,25 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2017-2020 VMware, Inc. or its affiliates. All rights reserved. +%% +-module(rabbit_mqtt_connection_info). + +%% Module to add the MQTT client ID to authentication properties + +%% API +-export([additional_authn_params/4]). + +additional_authn_params(_Creds, _VHost, _Pid, Infos) -> + case proplists:get_value(variable_map, Infos, undefined) of + VariableMap when is_map(VariableMap) -> + case maps:get(<<"client_id">>, VariableMap, []) of + ClientId when is_binary(ClientId)-> + [{client_id, ClientId}]; + [] -> + [] + end; + _ -> + [] + end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl new file mode 100644 index 0000000000..0a150caa38 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_connection_sup.erl @@ -0,0 +1,43 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_connection_sup). + +-behaviour(supervisor2). +-behaviour(ranch_protocol). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([start_link/4, start_keepalive_link/0]). + +-export([init/1]). + +%%---------------------------------------------------------------------------- + +start_link(Ref, _Sock, _Transport, []) -> + {ok, SupPid} = supervisor2:start_link(?MODULE, []), + {ok, KeepaliveSup} = supervisor2:start_child( + SupPid, + {rabbit_mqtt_keepalive_sup, + {rabbit_mqtt_connection_sup, start_keepalive_link, []}, + intrinsic, infinity, supervisor, [rabbit_keepalive_sup]}), + {ok, ReaderPid} = supervisor2:start_child( + SupPid, + {rabbit_mqtt_reader, + {rabbit_mqtt_reader, start_link, [KeepaliveSup, Ref]}, + intrinsic, ?WORKER_WAIT, worker, [rabbit_mqtt_reader]}), + {ok, SupPid, ReaderPid}. + +start_keepalive_link() -> + supervisor2:start_link(?MODULE, []). + +%%---------------------------------------------------------------------------- + +init([]) -> + {ok, {{one_for_all, 0, 1}, []}}. + + diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl new file mode 100644 index 0000000000..950c5bd6c4 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_frame.erl @@ -0,0 +1,224 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_frame). + +-export([parse/2, initial_state/0]). +-export([serialise/1]). + +-include("rabbit_mqtt_frame.hrl"). + +-define(RESERVED, 0). +-define(MAX_LEN, 16#fffffff). +-define(HIGHBIT, 2#10000000). +-define(LOWBITS, 2#01111111). + +initial_state() -> none. + +parse(<<>>, none) -> + {more, fun(Bin) -> parse(Bin, none) end}; +parse(<<MessageType:4, Dup:1, QoS:2, Retain:1, Rest/binary>>, none) -> + parse_remaining_len(Rest, #mqtt_frame_fixed{ type = MessageType, + dup = bool(Dup), + qos = QoS, + retain = bool(Retain) }); +parse(Bin, Cont) -> Cont(Bin). + +parse_remaining_len(<<>>, Fixed) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Fixed) end}; +parse_remaining_len(Rest, Fixed) -> + parse_remaining_len(Rest, Fixed, 1, 0). + +parse_remaining_len(_Bin, _Fixed, _Multiplier, Length) + when Length > ?MAX_LEN -> + {error, invalid_mqtt_frame_len}; +parse_remaining_len(<<>>, Fixed, Multiplier, Length) -> + {more, fun(Bin) -> parse_remaining_len(Bin, Fixed, Multiplier, Length) end}; +parse_remaining_len(<<1:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) -> + parse_remaining_len(Rest, Fixed, Multiplier * ?HIGHBIT, Value + Len * Multiplier); +parse_remaining_len(<<0:1, Len:7, Rest/binary>>, Fixed, Multiplier, Value) -> + parse_frame(Rest, Fixed, Value + Len * Multiplier). + +parse_frame(Bin, #mqtt_frame_fixed{ type = Type, + qos = Qos } = Fixed, Length) -> + case {Type, Bin} of + {?CONNECT, <<FrameBin:Length/binary, Rest/binary>>} -> + {ProtoName, Rest1} = parse_utf(FrameBin), + <<ProtoVersion : 8, Rest2/binary>> = Rest1, + <<UsernameFlag : 1, + PasswordFlag : 1, + WillRetain : 1, + WillQos : 2, + WillFlag : 1, + CleanSession : 1, + _Reserved : 1, + KeepAlive : 16/big, + Rest3/binary>> = Rest2, + {ClientId, Rest4} = parse_utf(Rest3), + {WillTopic, Rest5} = parse_utf(Rest4, WillFlag), + {WillMsg, Rest6} = parse_msg(Rest5, WillFlag), + {UserName, Rest7} = parse_utf(Rest6, UsernameFlag), + {PasssWord, <<>>} = parse_utf(Rest7, PasswordFlag), + case protocol_name_approved(ProtoVersion, ProtoName) of + true -> + wrap(Fixed, + #mqtt_frame_connect{ + proto_ver = ProtoVersion, + will_retain = bool(WillRetain), + will_qos = WillQos, + will_flag = bool(WillFlag), + clean_sess = bool(CleanSession), + keep_alive = KeepAlive, + client_id = ClientId, + will_topic = WillTopic, + will_msg = WillMsg, + username = UserName, + password = PasssWord}, Rest); + false -> + {error, protocol_header_corrupt} + end; + {?PUBLISH, <<FrameBin:Length/binary, Rest/binary>>} -> + {TopicName, Rest1} = parse_utf(FrameBin), + {MessageId, Payload} = case Qos of + 0 -> {undefined, Rest1}; + _ -> <<M:16/big, R/binary>> = Rest1, + {M, R} + end, + wrap(Fixed, #mqtt_frame_publish { topic_name = TopicName, + message_id = MessageId }, + Payload, Rest); + {?PUBACK, <<FrameBin:Length/binary, Rest/binary>>} -> + <<MessageId:16/big>> = FrameBin, + wrap(Fixed, #mqtt_frame_publish { message_id = MessageId }, Rest); + {Subs, <<FrameBin:Length/binary, Rest/binary>>} + when Subs =:= ?SUBSCRIBE orelse Subs =:= ?UNSUBSCRIBE -> + 1 = Qos, + <<MessageId:16/big, Rest1/binary>> = FrameBin, + Topics = parse_topics(Subs, Rest1, []), + wrap(Fixed, #mqtt_frame_subscribe { message_id = MessageId, + topic_table = Topics }, Rest); + {Minimal, Rest} + when Minimal =:= ?DISCONNECT orelse Minimal =:= ?PINGREQ -> + Length = 0, + wrap(Fixed, Rest); + {_, TooShortBin} -> + {more, fun(BinMore) -> + parse_frame(<<TooShortBin/binary, BinMore/binary>>, + Fixed, Length) + end} + end. + +parse_topics(_, <<>>, Topics) -> + Topics; +parse_topics(?SUBSCRIBE = Sub, Bin, Topics) -> + {Name, <<_:6, QoS:2, Rest/binary>>} = parse_utf(Bin), + parse_topics(Sub, Rest, [#mqtt_topic { name = Name, qos = QoS } | Topics]); +parse_topics(?UNSUBSCRIBE = Sub, Bin, Topics) -> + {Name, <<Rest/binary>>} = parse_utf(Bin), + parse_topics(Sub, Rest, [#mqtt_topic { name = Name } | Topics]). + +wrap(Fixed, Variable, Payload, Rest) -> + {ok, #mqtt_frame { variable = Variable, fixed = Fixed, payload = Payload }, Rest}. +wrap(Fixed, Variable, Rest) -> + {ok, #mqtt_frame { variable = Variable, fixed = Fixed }, Rest}. +wrap(Fixed, Rest) -> + {ok, #mqtt_frame { fixed = Fixed }, Rest}. + +parse_utf(Bin, 0) -> + {undefined, Bin}; +parse_utf(Bin, _) -> + parse_utf(Bin). + +parse_utf(<<Len:16/big, Str:Len/binary, Rest/binary>>) -> + {binary_to_list(Str), Rest}. + +parse_msg(Bin, 0) -> + {undefined, Bin}; +parse_msg(<<Len:16/big, Msg:Len/binary, Rest/binary>>, _) -> + {Msg, Rest}. + +bool(0) -> false; +bool(1) -> true. + +%% serialisation + +serialise(#mqtt_frame{ fixed = Fixed, + variable = Variable, + payload = Payload }) -> + serialise_variable(Fixed, Variable, serialise_payload(Payload)). + +serialise_payload(undefined) -> <<>>; +serialise_payload(B) when is_binary(B) -> B. + +serialise_variable(#mqtt_frame_fixed { type = ?CONNACK } = Fixed, + #mqtt_frame_connack { session_present = SessionPresent, + return_code = ReturnCode }, + <<>> = PayloadBin) -> + VariableBin = <<?RESERVED:7, (opt(SessionPresent)):1, ReturnCode:8>>, + serialise_fixed(Fixed, VariableBin, PayloadBin); + +serialise_variable(#mqtt_frame_fixed { type = SubAck } = Fixed, + #mqtt_frame_suback { message_id = MessageId, + qos_table = Qos }, + <<>> = _PayloadBin) + when SubAck =:= ?SUBACK orelse SubAck =:= ?UNSUBACK -> + VariableBin = <<MessageId:16/big>>, + QosBin = << <<?RESERVED:6, Q:2>> || Q <- Qos >>, + serialise_fixed(Fixed, VariableBin, QosBin); + +serialise_variable(#mqtt_frame_fixed { type = ?PUBLISH, + qos = Qos } = Fixed, + #mqtt_frame_publish { topic_name = TopicName, + message_id = MessageId }, + PayloadBin) -> + TopicBin = serialise_utf(TopicName), + MessageIdBin = case Qos of + 0 -> <<>>; + 1 -> <<MessageId:16/big>> + end, + serialise_fixed(Fixed, <<TopicBin/binary, MessageIdBin/binary>>, PayloadBin); + +serialise_variable(#mqtt_frame_fixed { type = ?PUBACK } = Fixed, + #mqtt_frame_publish { message_id = MessageId }, + PayloadBin) -> + MessageIdBin = <<MessageId:16/big>>, + serialise_fixed(Fixed, MessageIdBin, PayloadBin); + +serialise_variable(#mqtt_frame_fixed {} = Fixed, + undefined, + <<>> = _PayloadBin) -> + serialise_fixed(Fixed, <<>>, <<>>). + +serialise_fixed(#mqtt_frame_fixed{ type = Type, + dup = Dup, + qos = Qos, + retain = Retain }, VariableBin, PayloadBin) + when is_integer(Type) andalso ?CONNECT =< Type andalso Type =< ?DISCONNECT -> + Len = size(VariableBin) + size(PayloadBin), + true = (Len =< ?MAX_LEN), + LenBin = serialise_len(Len), + <<Type:4, (opt(Dup)):1, (opt(Qos)):2, (opt(Retain)):1, + LenBin/binary, VariableBin/binary, PayloadBin/binary>>. + +serialise_utf(String) -> + StringBin = unicode:characters_to_binary(String), + Len = size(StringBin), + true = (Len =< 16#ffff), + <<Len:16/big, StringBin/binary>>. + +serialise_len(N) when N =< ?LOWBITS -> + <<0:1, N:7>>; +serialise_len(N) -> + <<1:1, (N rem ?HIGHBIT):7, (serialise_len(N div ?HIGHBIT))/binary>>. + +opt(undefined) -> ?RESERVED; +opt(false) -> 0; +opt(true) -> 1; +opt(X) when is_integer(X) -> X. + +protocol_name_approved(Ver, Name) -> + lists:member({Ver, Name}, ?PROTOCOL_NAMES). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl new file mode 100644 index 0000000000..2a371b4142 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_internal_event_handler.erl @@ -0,0 +1,45 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_internal_event_handler). + +-behaviour(gen_event). + +-export([init/1, handle_event/2, handle_call/2, handle_info/2, terminate/2, code_change/3]). + +-import(rabbit_misc, [pget/2]). + +init([]) -> + {ok, []}. + +handle_event({event, vhost_created, Info, _, _}, State) -> + Name = pget(name, Info), + rabbit_mqtt_retainer_sup:child_for_vhost(Name), + {ok, State}; +handle_event({event, vhost_deleted, Info, _, _}, State) -> + Name = pget(name, Info), + rabbit_mqtt_retainer_sup:delete_child(Name), + {ok, State}; +handle_event({event, maintenance_connections_closed, _Info, _, _}, State) -> + %% we should close our connections + {ok, NConnections} = rabbit_mqtt:close_all_client_connections("node is being put into maintenance mode"), + rabbit_log:alert("Closed ~b local MQTT client connections", [NConnections]), + {ok, State}; +handle_event(_Event, State) -> + {ok, State}. + +handle_call(_Request, State) -> + {ok, State}. + +handle_info(_Info, State) -> + {ok, State}. + +terminate(_Reason, _State) -> + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl new file mode 100644 index 0000000000..c3a25096e6 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_processor.erl @@ -0,0 +1,1054 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_processor). + +-export([info/2, initial_state/2, initial_state/5, + process_frame/2, amqp_pub/2, amqp_callback/2, send_will/1, + close_connection/1, handle_pre_hibernate/0, + handle_ra_event/2]). + +%% for testing purposes +-export([get_vhost_username/1, get_vhost/3, get_vhost_from_user_mapping/2, + add_client_id_to_adapter_info/2]). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_mqtt_frame.hrl"). +-include("rabbit_mqtt.hrl"). + +-define(APP, rabbitmq_mqtt). +-define(FRAME_TYPE(Frame, Type), + Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}). +-define(MAX_TOPIC_PERMISSION_CACHE_SIZE, 12). + +initial_state(Socket, SSLLoginName) -> + RealSocket = rabbit_net:unwrap_socket(Socket), + {ok, {PeerAddr, _PeerPort}} = rabbit_net:peername(RealSocket), + initial_state(RealSocket, SSLLoginName, + adapter_info(Socket, 'MQTT'), + fun serialise_and_send_to_client/2, PeerAddr). + +initial_state(Socket, SSLLoginName, + AdapterInfo0 = #amqp_adapter_info{additional_info = Extra}, + SendFun, PeerAddr) -> + {ok, {mqtt2amqp_fun, M2A}, {amqp2mqtt_fun, A2M}} = + rabbit_mqtt_util:get_topic_translation_funs(), + %% MQTT connections use exactly one channel. The frame max is not + %% applicable and there is no way to know what client is used. + AdapterInfo = AdapterInfo0#amqp_adapter_info{additional_info = [ + {channels, 1}, + {channel_max, 1}, + {frame_max, 0}, + {client_properties, + [{<<"product">>, longstr, <<"MQTT client">>}]} | Extra]}, + #proc_state{ unacked_pubs = gb_trees:empty(), + awaiting_ack = gb_trees:empty(), + message_id = 1, + subscriptions = #{}, + consumer_tags = {undefined, undefined}, + channels = {undefined, undefined}, + exchange = rabbit_mqtt_util:env(exchange), + socket = Socket, + adapter_info = AdapterInfo, + ssl_login_name = SSLLoginName, + send_fun = SendFun, + peer_addr = PeerAddr, + mqtt2amqp_fun = M2A, + amqp2mqtt_fun = A2M}. + +process_frame(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + PState = #proc_state{ connection = undefined } ) + when Type =/= ?CONNECT -> + {error, connect_expected, PState}; +process_frame(Frame = #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = Type }}, + PState) -> + case process_request(Type, Frame, PState) of + {ok, PState1} -> {ok, PState1, PState1#proc_state.connection}; + Ret -> Ret + end. + +add_client_id_to_adapter_info(ClientId, #amqp_adapter_info{additional_info = AdditionalInfo0} = AdapterInfo) -> + AdditionalInfo1 = [{variable_map, #{<<"client_id">> => ClientId}} + | AdditionalInfo0], + ClientProperties = proplists:get_value(client_properties, AdditionalInfo1, []) + ++ [{client_id, longstr, ClientId}], + AdditionalInfo2 = case lists:keysearch(client_properties, 1, AdditionalInfo1) of + {value, _} -> + lists:keyreplace(client_properties, + 1, + AdditionalInfo1, + {client_properties, ClientProperties}); + false -> + [{client_properties, ClientProperties} | AdditionalInfo1] + end, + AdapterInfo#amqp_adapter_info{additional_info = AdditionalInfo2}. + +process_request(?CONNECT, + #mqtt_frame{ variable = #mqtt_frame_connect{ + username = Username, + password = Password, + proto_ver = ProtoVersion, + clean_sess = CleanSess, + client_id = ClientId0, + keep_alive = Keepalive} = Var}, + PState0 = #proc_state{ ssl_login_name = SSLLoginName, + send_fun = SendFun, + adapter_info = AdapterInfo, + peer_addr = Addr}) -> + ClientId = case ClientId0 of + [] -> rabbit_mqtt_util:gen_client_id(); + [_|_] -> ClientId0 + end, + rabbit_log_connection:debug("Received a CONNECT, client ID: ~p (expanded to ~p), username: ~p, " + "clean session: ~p, protocol version: ~p, keepalive: ~p", + [ClientId0, ClientId, Username, CleanSess, ProtoVersion, Keepalive]), + AdapterInfo1 = add_client_id_to_adapter_info(rabbit_data_coercion:to_binary(ClientId), AdapterInfo), + PState1 = PState0#proc_state{adapter_info = AdapterInfo1}, + Ip = list_to_binary(inet:ntoa(Addr)), + {Return, PState5} = + case {lists:member(ProtoVersion, proplists:get_keys(?PROTOCOL_NAMES)), + ClientId0 =:= [] andalso CleanSess =:= false} of + {false, _} -> + {?CONNACK_PROTO_VER, PState1}; + {_, true} -> + {?CONNACK_INVALID_ID, PState1}; + _ -> + case creds(Username, Password, SSLLoginName) of + nocreds -> + rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt), + rabbit_log_connection:error("MQTT login failed: no credentials provided~n"), + {?CONNACK_CREDENTIALS, PState1}; + {invalid_creds, {undefined, Pass}} when is_list(Pass) -> + rabbit_core_metrics:auth_attempt_failed(Ip, <<>>, mqtt), + rabbit_log_connection:error("MQTT login failed: no username is provided"), + {?CONNACK_CREDENTIALS, PState1}; + {invalid_creds, {User, undefined}} when is_list(User) -> + rabbit_core_metrics:auth_attempt_failed(Ip, User, mqtt), + rabbit_log_connection:error("MQTT login failed for user '~p': no password provided", [User]), + {?CONNACK_CREDENTIALS, PState1}; + {UserBin, PassBin} -> + case process_login(UserBin, PassBin, ProtoVersion, PState1) of + connack_dup_auth -> + {SessionPresent0, PState2} = maybe_clean_sess(PState1), + {{?CONNACK_ACCEPT, SessionPresent0}, PState2}; + {?CONNACK_ACCEPT, Conn, VHost, AState} -> + case rabbit_mqtt_collector:register(ClientId, self()) of + {ok, Corr} -> + RetainerPid = rabbit_mqtt_retainer_sup:child_for_vhost(VHost), + link(Conn), + {ok, Ch} = amqp_connection:open_channel(Conn), + link(Ch), + amqp_channel:enable_delivery_flow_control(Ch), + Prefetch = rabbit_mqtt_util:env(prefetch), + #'basic.qos_ok'{} = amqp_channel:call(Ch, + #'basic.qos'{prefetch_count = Prefetch}), + rabbit_mqtt_reader:start_keepalive(self(), Keepalive), + PState3 = PState1#proc_state{ + will_msg = make_will_msg(Var), + clean_sess = CleanSess, + channels = {Ch, undefined}, + connection = Conn, + client_id = ClientId, + retainer_pid = RetainerPid, + auth_state = AState, + register_state = {pending, Corr}}, + {SessionPresent1, PState4} = maybe_clean_sess(PState3), + {{?CONNACK_ACCEPT, SessionPresent1}, PState4}; + %% e.g. this node was removed from the MQTT cluster members + {error, _} = Err -> + rabbit_log_connection:error("MQTT cannot accept a connection: " + "client ID tracker is unavailable: ~p", [Err]), + %% ignore all exceptions, we are shutting down + catch amqp_connection:close(Conn), + {?CONNACK_SERVER, PState1}; + {timeout, _} -> + rabbit_log_connection:error("MQTT cannot accept a connection: " + "client ID registration timed out"), + %% ignore all exceptions, we are shutting down + catch amqp_connection:close(Conn), + {?CONNACK_SERVER, PState1} + end; + ConnAck -> {ConnAck, PState1} + end + end + end, + {ReturnCode, SessionPresent} = case Return of + {?CONNACK_ACCEPT, Bool} -> {?CONNACK_ACCEPT, Bool}; + Other -> {Other, false} + end, + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?CONNACK}, + variable = #mqtt_frame_connack{ + session_present = SessionPresent, + return_code = ReturnCode}}, + PState5), + case ReturnCode of + ?CONNACK_ACCEPT -> {ok, PState5}; + ?CONNACK_CREDENTIALS -> {error, unauthenticated, PState5}; + ?CONNACK_AUTH -> {error, unauthorized, PState5}; + ?CONNACK_SERVER -> {error, unavailable, PState5}; + ?CONNACK_INVALID_ID -> {error, invalid_client_id, PState5}; + ?CONNACK_PROTO_VER -> {error, unsupported_protocol_version, PState5} + end; + +process_request(?PUBACK, + #mqtt_frame{ + variable = #mqtt_frame_publish{ message_id = MessageId }}, + #proc_state{ channels = {Channel, _}, + awaiting_ack = Awaiting } = PState) -> + %% tag can be missing because of bogus clients and QoS downgrades + case gb_trees:is_defined(MessageId, Awaiting) of + false -> + {ok, PState}; + true -> + Tag = gb_trees:get(MessageId, Awaiting), + amqp_channel:cast(Channel, #'basic.ack'{ delivery_tag = Tag }), + {ok, PState#proc_state{ awaiting_ack = gb_trees:delete(MessageId, Awaiting) }} + end; + +process_request(?PUBLISH, + Frame = #mqtt_frame{ + fixed = Fixed = #mqtt_frame_fixed{ qos = ?QOS_2 }}, + PState) -> + % Downgrade QOS_2 to QOS_1 + process_request(?PUBLISH, + Frame#mqtt_frame{ + fixed = Fixed#mqtt_frame_fixed{ qos = ?QOS_1 }}, + PState); +process_request(?PUBLISH, + #mqtt_frame{ + fixed = #mqtt_frame_fixed{ qos = Qos, + retain = Retain, + dup = Dup }, + variable = #mqtt_frame_publish{ topic_name = Topic, + message_id = MessageId }, + payload = Payload }, + PState = #proc_state{retainer_pid = RPid, + amqp2mqtt_fun = Amqp2MqttFun}) -> + check_publish(Topic, fun() -> + Msg = #mqtt_msg{retain = Retain, + qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload}, + Result = amqp_pub(Msg, PState), + case Retain of + false -> ok; + true -> hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, Msg) + end, + {ok, Result} + end, PState); + +process_request(?SUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ + message_id = SubscribeMsgId, + topic_table = Topics}, + payload = undefined}, + #proc_state{channels = {Channel, _}, + exchange = Exchange, + retainer_pid = RPid, + send_fun = SendFun, + message_id = StateMsgId, + mqtt2amqp_fun = Mqtt2AmqpFun} = PState0) -> + rabbit_log_connection:debug("Received a SUBSCRIBE for topic(s) ~p", [Topics]), + check_subscribe(Topics, fun() -> + {QosResponse, PState1} = + lists:foldl(fun (#mqtt_topic{name = TopicName, + qos = Qos}, {QosList, PState}) -> + SupportedQos = supported_subs_qos(Qos), + {Queue, #proc_state{subscriptions = Subs} = PState1} = + ensure_queue(SupportedQos, PState), + RoutingKey = Mqtt2AmqpFun(TopicName), + Binding = #'queue.bind'{ + queue = Queue, + exchange = Exchange, + routing_key = RoutingKey}, + #'queue.bind_ok'{} = amqp_channel:call(Channel, Binding), + SupportedQosList = case maps:find(TopicName, Subs) of + {ok, L} -> [SupportedQos|L]; + error -> [SupportedQos] + end, + {[SupportedQos | QosList], + PState1 #proc_state{ + subscriptions = + maps:put(TopicName, SupportedQosList, Subs)}} + end, {[], PState0}, Topics), + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{type = ?SUBACK}, + variable = #mqtt_frame_suback{ + message_id = SubscribeMsgId, + qos_table = QosResponse}}, PState1), + %% we may need to send up to length(Topics) messages. + %% if QoS is > 0 then we need to generate a message id, + %% and increment the counter. + StartMsgId = safe_max_id(SubscribeMsgId, StateMsgId), + N = lists:foldl(fun (Topic, Acc) -> + case maybe_send_retained_message(RPid, Topic, Acc, PState1) of + {true, X} -> Acc + X; + false -> Acc + end + end, StartMsgId, Topics), + {ok, PState1#proc_state{message_id = N}} + end, PState0); + +process_request(?UNSUBSCRIBE, + #mqtt_frame{ + variable = #mqtt_frame_subscribe{ message_id = MessageId, + topic_table = Topics }, + payload = undefined }, #proc_state{ channels = {Channel, _}, + exchange = Exchange, + client_id = ClientId, + subscriptions = Subs0, + send_fun = SendFun, + mqtt2amqp_fun = Mqtt2AmqpFun } = PState) -> + rabbit_log_connection:debug("Received an UNSUBSCRIBE for topic(s) ~p", [Topics]), + Queues = rabbit_mqtt_util:subcription_queue_name(ClientId), + Subs1 = + lists:foldl( + fun (#mqtt_topic{ name = TopicName }, Subs) -> + QosSubs = case maps:find(TopicName, Subs) of + {ok, Val} when is_list(Val) -> lists:usort(Val); + error -> [] + end, + RoutingKey = Mqtt2AmqpFun(TopicName), + lists:foreach( + fun (QosSub) -> + Queue = element(QosSub + 1, Queues), + Binding = #'queue.unbind'{ + queue = Queue, + exchange = Exchange, + routing_key = RoutingKey}, + #'queue.unbind_ok'{} = amqp_channel:call(Channel, Binding) + end, QosSubs), + maps:remove(TopicName, Subs) + end, Subs0, Topics), + SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed { type = ?UNSUBACK }, + variable = #mqtt_frame_suback{ message_id = MessageId }}, + PState), + {ok, PState #proc_state{ subscriptions = Subs1 }}; + +process_request(?PINGREQ, #mqtt_frame{}, #proc_state{ send_fun = SendFun } = PState) -> + rabbit_log_connection:debug("Received a PINGREQ"), + SendFun(#mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PINGRESP }}, + PState), + rabbit_log_connection:debug("Sent a PINGRESP"), + {ok, PState}; + +process_request(?DISCONNECT, #mqtt_frame{}, PState) -> + rabbit_log_connection:debug("Received a DISCONNECT"), + {stop, PState}. + +hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, #mqtt_msg{payload = <<"">>}) -> + Topic1 = Amqp2MqttFun(Topic0), + rabbit_mqtt_retainer:clear(RetainerPid, Topic1), + ok; +hand_off_to_retainer(RetainerPid, Amqp2MqttFun, Topic0, Msg) -> + Topic1 = Amqp2MqttFun(Topic0), + rabbit_mqtt_retainer:retain(RetainerPid, Topic1, Msg), + ok. + +maybe_send_retained_message(RPid, #mqtt_topic{name = Topic0, qos = SubscribeQos}, MsgId, + #proc_state{ send_fun = SendFun, + amqp2mqtt_fun = Amqp2MqttFun } = PState) -> + Topic1 = Amqp2MqttFun(Topic0), + case rabbit_mqtt_retainer:fetch(RPid, Topic1) of + undefined -> false; + Msg -> + %% calculate effective QoS as the lower value of SUBSCRIBE frame QoS + %% and retained message QoS. The spec isn't super clear on this, we + %% do what Mosquitto does, per user feedback. + Qos = erlang:min(SubscribeQos, Msg#mqtt_msg.qos), + Id = case Qos of + ?QOS_0 -> undefined; + ?QOS_1 -> MsgId + end, + SendFun(#mqtt_frame{fixed = #mqtt_frame_fixed{ + type = ?PUBLISH, + qos = Qos, + dup = false, + retain = Msg#mqtt_msg.retain + }, variable = #mqtt_frame_publish{ + message_id = Id, + topic_name = Topic1 + }, + payload = Msg#mqtt_msg.payload}, PState), + case Qos of + ?QOS_0 -> false; + ?QOS_1 -> {true, 1} + end + end. + +amqp_callback({#'basic.deliver'{ consumer_tag = ConsumerTag, + delivery_tag = DeliveryTag, + routing_key = RoutingKey }, + #amqp_msg{ props = #'P_basic'{ headers = Headers }, + payload = Payload }, + DeliveryCtx} = Delivery, + #proc_state{ channels = {Channel, _}, + awaiting_ack = Awaiting, + message_id = MsgId, + send_fun = SendFun, + amqp2mqtt_fun = Amqp2MqttFun } = PState) -> + amqp_channel:notify_received(DeliveryCtx), + case {delivery_dup(Delivery), delivery_qos(ConsumerTag, Headers, PState)} of + {true, {?QOS_0, ?QOS_1}} -> + amqp_channel:cast( + Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), + {ok, PState}; + {true, {?QOS_0, ?QOS_0}} -> + {ok, PState}; + {Dup, {DeliveryQos, _SubQos} = Qos} -> + TopicName = Amqp2MqttFun(RoutingKey), + SendFun( + #mqtt_frame{ fixed = #mqtt_frame_fixed{ + type = ?PUBLISH, + qos = DeliveryQos, + dup = Dup }, + variable = #mqtt_frame_publish{ + message_id = + case DeliveryQos of + ?QOS_0 -> undefined; + ?QOS_1 -> MsgId + end, + topic_name = TopicName }, + payload = Payload}, PState), + case Qos of + {?QOS_0, ?QOS_0} -> + {ok, PState}; + {?QOS_1, ?QOS_1} -> + Awaiting1 = gb_trees:insert(MsgId, DeliveryTag, Awaiting), + PState1 = PState#proc_state{ awaiting_ack = Awaiting1 }, + PState2 = next_msg_id(PState1), + {ok, PState2}; + {?QOS_0, ?QOS_1} -> + amqp_channel:cast( + Channel, #'basic.ack'{ delivery_tag = DeliveryTag }), + {ok, PState} + end + end; + +amqp_callback(#'basic.ack'{ multiple = true, delivery_tag = Tag } = Ack, + PState = #proc_state{ unacked_pubs = UnackedPubs, + send_fun = SendFun }) -> + case gb_trees:size(UnackedPubs) > 0 andalso + gb_trees:take_smallest(UnackedPubs) of + {TagSmall, MsgId, UnackedPubs1} when TagSmall =< Tag -> + SendFun( + #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ message_id = MsgId }}, + PState), + amqp_callback(Ack, PState #proc_state{ unacked_pubs = UnackedPubs1 }); + _ -> + {ok, PState} + end; + +amqp_callback(#'basic.ack'{ multiple = false, delivery_tag = Tag }, + PState = #proc_state{ unacked_pubs = UnackedPubs, + send_fun = SendFun }) -> + SendFun( + #mqtt_frame{ fixed = #mqtt_frame_fixed{ type = ?PUBACK }, + variable = #mqtt_frame_publish{ + message_id = gb_trees:get( + Tag, UnackedPubs) }}, PState), + {ok, PState #proc_state{ unacked_pubs = gb_trees:delete(Tag, UnackedPubs) }}. + +delivery_dup({#'basic.deliver'{ redelivered = Redelivered }, + #amqp_msg{ props = #'P_basic'{ headers = Headers }}, + _DeliveryCtx}) -> + case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-dup">>) of + undefined -> Redelivered; + {bool, Dup} -> Redelivered orelse Dup + end. + +ensure_valid_mqtt_message_id(Id) when Id >= 16#ffff -> + 1; +ensure_valid_mqtt_message_id(Id) -> + Id. + +safe_max_id(Id0, Id1) -> + ensure_valid_mqtt_message_id(erlang:max(Id0, Id1)). + +next_msg_id(PState = #proc_state{ message_id = MsgId0 }) -> + MsgId1 = ensure_valid_mqtt_message_id(MsgId0 + 1), + PState#proc_state{ message_id = MsgId1 }. + +%% decide at which qos level to deliver based on subscription +%% and the message publish qos level. non-MQTT publishes are +%% assumed to be qos 1, regardless of delivery_mode. +delivery_qos(Tag, _Headers, #proc_state{ consumer_tags = {Tag, _} }) -> + {?QOS_0, ?QOS_0}; +delivery_qos(Tag, Headers, #proc_state{ consumer_tags = {_, Tag} }) -> + case rabbit_mqtt_util:table_lookup(Headers, <<"x-mqtt-publish-qos">>) of + {byte, Qos} -> {lists:min([Qos, ?QOS_1]), ?QOS_1}; + undefined -> {?QOS_1, ?QOS_1} + end. + +maybe_clean_sess(PState = #proc_state { clean_sess = false, + connection = Conn, + client_id = ClientId }) -> + SessionPresent = session_present(Conn, ClientId), + {_Queue, PState1} = ensure_queue(?QOS_1, PState), + {SessionPresent, PState1}; +maybe_clean_sess(PState = #proc_state { clean_sess = true, + connection = Conn, + client_id = ClientId }) -> + {_, Queue} = rabbit_mqtt_util:subcription_queue_name(ClientId), + {ok, Channel} = amqp_connection:open_channel(Conn), + ok = try amqp_channel:call(Channel, #'queue.delete'{ queue = Queue }) of + #'queue.delete_ok'{} -> ok + catch + exit:_Error -> ok + after + amqp_channel:close(Channel) + end, + {false, PState}. + +session_present(Conn, ClientId) -> + {_, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), + Declare = #'queue.declare'{queue = QueueQ1, + passive = true}, + {ok, Channel} = amqp_connection:open_channel(Conn), + try + amqp_channel:call(Channel, Declare), + amqp_channel:close(Channel), + true + catch exit:{{shutdown, {server_initiated_close, ?NOT_FOUND, _Text}}, _} -> + false + end. + +make_will_msg(#mqtt_frame_connect{ will_flag = false }) -> + undefined; +make_will_msg(#mqtt_frame_connect{ will_retain = Retain, + will_qos = Qos, + will_topic = Topic, + will_msg = Msg }) -> + #mqtt_msg{ retain = Retain, + qos = Qos, + topic = Topic, + dup = false, + payload = Msg }. + +process_login(_UserBin, _PassBin, _ProtoVersion, + #proc_state{channels = {Channel, _}, + peer_addr = Addr, + auth_state = #auth_state{username = Username, + vhost = VHost}}) when is_pid(Channel) -> + UsernameStr = rabbit_data_coercion:to_list(Username), + VHostStr = rabbit_data_coercion:to_list(VHost), + rabbit_core_metrics:auth_attempt_failed(list_to_binary(inet:ntoa(Addr)), Username, mqtt), + rabbit_log_connection:warning("MQTT detected duplicate connect/login attempt for user ~p, vhost ~p", + [UsernameStr, VHostStr]), + connack_dup_auth; +process_login(UserBin, PassBin, ProtoVersion, + #proc_state{channels = {undefined, undefined}, + socket = Sock, + adapter_info = AdapterInfo, + ssl_login_name = SslLoginName, + peer_addr = Addr}) -> + {ok, {_, _, _, ToPort}} = rabbit_net:socket_ends(Sock, inbound), + {VHostPickedUsing, {VHost, UsernameBin}} = get_vhost(UserBin, SslLoginName, ToPort), + rabbit_log_connection:info( + "MQTT vhost picked using ~s~n", + [human_readable_vhost_lookup_strategy(VHostPickedUsing)]), + RemoteAddress = list_to_binary(inet:ntoa(Addr)), + case rabbit_vhost:exists(VHost) of + true -> + case amqp_connection:start(#amqp_params_direct{ + username = UsernameBin, + password = PassBin, + virtual_host = VHost, + adapter_info = set_proto_version(AdapterInfo, ProtoVersion)}) of + {ok, Connection} -> + case rabbit_access_control:check_user_loopback(UsernameBin, Addr) of + ok -> + rabbit_core_metrics:auth_attempt_succeeded(RemoteAddress, UsernameBin, + mqtt), + [{internal_user, InternalUser}] = amqp_connection:info( + Connection, [internal_user]), + {?CONNACK_ACCEPT, Connection, VHost, + #auth_state{user = InternalUser, + username = UsernameBin, + vhost = VHost}}; + not_allowed -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, + mqtt), + amqp_connection:close(Connection), + rabbit_log_connection:warning( + "MQTT login failed for ~p access_refused " + "(access must be from localhost)~n", + [binary_to_list(UsernameBin)]), + ?CONNACK_AUTH + end; + {error, {auth_failure, Explanation}} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), + rabbit_log_connection:error("MQTT login failed for user '~p' auth_failure: ~s~n", + [binary_to_list(UserBin), Explanation]), + ?CONNACK_CREDENTIALS; + {error, access_refused} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), + rabbit_log_connection:warning("MQTT login failed for user '~p': access_refused " + "(vhost access not allowed)~n", + [binary_to_list(UserBin)]), + ?CONNACK_AUTH; + {error, not_allowed} -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), + %% when vhost allowed for TLS connection + rabbit_log_connection:warning("MQTT login failed for ~p access_refused " + "(vhost access not allowed)~n", + [binary_to_list(UserBin)]), + ?CONNACK_AUTH + end; + false -> + rabbit_core_metrics:auth_attempt_failed(RemoteAddress, UsernameBin, mqtt), + rabbit_log_connection:error("MQTT login failed for user '~p' auth_failure: vhost ~s does not exist~n", + [binary_to_list(UserBin), VHost]), + ?CONNACK_CREDENTIALS + end. + +get_vhost(UserBin, none, Port) -> + get_vhost_no_ssl(UserBin, Port); +get_vhost(UserBin, undefined, Port) -> + get_vhost_no_ssl(UserBin, Port); +get_vhost(UserBin, SslLogin, Port) -> + get_vhost_ssl(UserBin, SslLogin, Port). + +get_vhost_no_ssl(UserBin, Port) -> + case vhost_in_username(UserBin) of + true -> + {vhost_in_username_or_default, get_vhost_username(UserBin)}; + false -> + PortVirtualHostMapping = rabbit_runtime_parameters:value_global( + mqtt_port_to_vhost_mapping + ), + case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of + undefined -> + {default_vhost, {rabbit_mqtt_util:env(vhost), UserBin}}; + VHost -> + {port_to_vhost_mapping, {VHost, UserBin}} + end + end. + +get_vhost_ssl(UserBin, SslLoginName, Port) -> + UserVirtualHostMapping = rabbit_runtime_parameters:value_global( + mqtt_default_vhosts + ), + case get_vhost_from_user_mapping(SslLoginName, UserVirtualHostMapping) of + undefined -> + PortVirtualHostMapping = rabbit_runtime_parameters:value_global( + mqtt_port_to_vhost_mapping + ), + case get_vhost_from_port_mapping(Port, PortVirtualHostMapping) of + undefined -> + {vhost_in_username_or_default, get_vhost_username(UserBin)}; + VHostFromPortMapping -> + {port_to_vhost_mapping, {VHostFromPortMapping, UserBin}} + end; + VHostFromCertMapping -> + {cert_to_vhost_mapping, {VHostFromCertMapping, UserBin}} + end. + +vhost_in_username(UserBin) -> + case application:get_env(?APP, ignore_colons_in_username) of + {ok, true} -> false; + _ -> + %% split at the last colon, disallowing colons in username + case re:split(UserBin, ":(?!.*?:)") of + [_, _] -> true; + [UserBin] -> false + end + end. + +get_vhost_username(UserBin) -> + Default = {rabbit_mqtt_util:env(vhost), UserBin}, + case application:get_env(?APP, ignore_colons_in_username) of + {ok, true} -> Default; + _ -> + %% split at the last colon, disallowing colons in username + case re:split(UserBin, ":(?!.*?:)") of + [Vhost, UserName] -> {Vhost, UserName}; + [UserBin] -> Default + end + end. + +get_vhost_from_user_mapping(_User, not_found) -> + undefined; +get_vhost_from_user_mapping(User, Mapping) -> + M = rabbit_data_coercion:to_proplist(Mapping), + case rabbit_misc:pget(User, M) of + undefined -> + undefined; + VHost -> + VHost + end. + +get_vhost_from_port_mapping(_Port, not_found) -> + undefined; +get_vhost_from_port_mapping(Port, Mapping) -> + M = rabbit_data_coercion:to_proplist(Mapping), + Res = case rabbit_misc:pget(rabbit_data_coercion:to_binary(Port), M) of + undefined -> + undefined; + VHost -> + VHost + end, + Res. + +human_readable_vhost_lookup_strategy(vhost_in_username_or_default) -> + "vhost in username or default"; +human_readable_vhost_lookup_strategy(port_to_vhost_mapping) -> + "MQTT port to vhost mapping"; +human_readable_vhost_lookup_strategy(cert_to_vhost_mapping) -> + "client certificate to vhost mapping"; +human_readable_vhost_lookup_strategy(default_vhost) -> + "plugin configuration or default"; +human_readable_vhost_lookup_strategy(Val) -> + atom_to_list(Val). + +creds(User, Pass, SSLLoginName) -> + DefaultUser = rabbit_mqtt_util:env(default_user), + DefaultPass = rabbit_mqtt_util:env(default_pass), + {ok, Anon} = application:get_env(?APP, allow_anonymous), + {ok, TLSAuth} = application:get_env(?APP, ssl_cert_login), + HaveDefaultCreds = Anon =:= true andalso + is_binary(DefaultUser) andalso + is_binary(DefaultPass), + + CredentialsProvided = User =/= undefined orelse + Pass =/= undefined, + + CorrectCredentials = is_list(User) andalso + is_list(Pass), + + SSLLoginProvided = TLSAuth =:= true andalso + SSLLoginName =/= none, + + case {CredentialsProvided, CorrectCredentials, SSLLoginProvided, HaveDefaultCreds} of + %% Username and password take priority + {true, true, _, _} -> {list_to_binary(User), + list_to_binary(Pass)}; + %% Either username or password is provided + {true, false, _, _} -> {invalid_creds, {User, Pass}}; + %% rabbitmq_mqtt.ssl_cert_login is true. SSL user name provided. + %% Authenticating using username only. + {false, false, true, _} -> {SSLLoginName, none}; + %% Anonymous connection uses default credentials + {false, false, false, true} -> {DefaultUser, DefaultPass}; + _ -> nocreds + end. + +supported_subs_qos(?QOS_0) -> ?QOS_0; +supported_subs_qos(?QOS_1) -> ?QOS_1; +supported_subs_qos(?QOS_2) -> ?QOS_1. + +delivery_mode(?QOS_0) -> 1; +delivery_mode(?QOS_1) -> 2; +delivery_mode(?QOS_2) -> 2. + +%% different qos subscriptions are received in different queues +%% with appropriate durability and timeout arguments +%% this will lead to duplicate messages for overlapping subscriptions +%% with different qos values - todo: prevent duplicates +ensure_queue(Qos, #proc_state{ channels = {Channel, _}, + client_id = ClientId, + clean_sess = CleanSess, + consumer_tags = {TagQ0, TagQ1} = Tags} = PState) -> + {QueueQ0, QueueQ1} = rabbit_mqtt_util:subcription_queue_name(ClientId), + Qos1Args = case {rabbit_mqtt_util:env(subscription_ttl), CleanSess} of + {undefined, _} -> + []; + {Ms, false} when is_integer(Ms) -> + [{<<"x-expires">>, long, Ms}]; + _ -> + [] + end, + QueueSetup = + case {TagQ0, TagQ1, Qos} of + {undefined, _, ?QOS_0} -> + {QueueQ0, + #'queue.declare'{ queue = QueueQ0, + durable = false, + auto_delete = true }, + #'basic.consume'{ queue = QueueQ0, + no_ack = true }}; + {_, undefined, ?QOS_1} -> + {QueueQ1, + #'queue.declare'{ queue = QueueQ1, + durable = true, + %% Clean session means a transient connection, + %% translating into auto-delete. + %% + %% see rabbitmq/rabbitmq-mqtt#37 + auto_delete = CleanSess, + arguments = Qos1Args }, + #'basic.consume'{ queue = QueueQ1, + no_ack = false }}; + {_, _, ?QOS_0} -> + {exists, QueueQ0}; + {_, _, ?QOS_1} -> + {exists, QueueQ1} + end, + case QueueSetup of + {Queue, Declare, Consume} -> + #'queue.declare_ok'{} = amqp_channel:call(Channel, Declare), + #'basic.consume_ok'{ consumer_tag = Tag } = + amqp_channel:call(Channel, Consume), + {Queue, PState #proc_state{ consumer_tags = setelement(Qos+1, Tags, Tag) }}; + {exists, Q} -> + {Q, PState} + end. + +send_will(PState = #proc_state{will_msg = undefined}) -> + PState; + +send_will(PState = #proc_state{will_msg = WillMsg = #mqtt_msg{retain = Retain, + topic = Topic}, + retainer_pid = RPid, + channels = {ChQos0, ChQos1}, + amqp2mqtt_fun = Amqp2MqttFun}) -> + case check_topic_access(Topic, write, PState) of + ok -> + amqp_pub(WillMsg, PState), + case Retain of + false -> ok; + true -> + hand_off_to_retainer(RPid, Amqp2MqttFun, Topic, WillMsg) + end; + Error -> + rabbit_log:warning( + "Could not send last will: ~p~n", + [Error]) + end, + case ChQos1 of + undefined -> ok; + _ -> amqp_channel:close(ChQos1) + end, + case ChQos0 of + undefined -> ok; + _ -> amqp_channel:close(ChQos0) + end, + PState #proc_state{ channels = {undefined, undefined} }. + +amqp_pub(undefined, PState) -> + PState; + +%% set up a qos1 publishing channel if necessary +%% this channel will only be used for publishing, not consuming +amqp_pub(Msg = #mqtt_msg{ qos = ?QOS_1 }, + PState = #proc_state{ channels = {ChQos0, undefined}, + awaiting_seqno = undefined, + connection = Conn }) -> + {ok, Channel} = amqp_connection:open_channel(Conn), + #'confirm.select_ok'{} = amqp_channel:call(Channel, #'confirm.select'{}), + amqp_channel:register_confirm_handler(Channel, self()), + amqp_pub(Msg, PState #proc_state{ channels = {ChQos0, Channel}, + awaiting_seqno = 1 }); + +amqp_pub(#mqtt_msg{ qos = Qos, + topic = Topic, + dup = Dup, + message_id = MessageId, + payload = Payload }, + PState = #proc_state{ channels = {ChQos0, ChQos1}, + exchange = Exchange, + unacked_pubs = UnackedPubs, + awaiting_seqno = SeqNo, + mqtt2amqp_fun = Mqtt2AmqpFun }) -> + RoutingKey = Mqtt2AmqpFun(Topic), + Method = #'basic.publish'{ exchange = Exchange, + routing_key = RoutingKey }, + Headers = [{<<"x-mqtt-publish-qos">>, byte, Qos}, + {<<"x-mqtt-dup">>, bool, Dup}], + Msg = #amqp_msg{ props = #'P_basic'{ headers = Headers, + delivery_mode = delivery_mode(Qos)}, + payload = Payload }, + {UnackedPubs1, Ch, SeqNo1} = + case Qos =:= ?QOS_1 andalso MessageId =/= undefined of + true -> {gb_trees:enter(SeqNo, MessageId, UnackedPubs), ChQos1, + SeqNo + 1}; + false -> {UnackedPubs, ChQos0, SeqNo} + end, + amqp_channel:cast_flow(Ch, Method, Msg), + PState #proc_state{ unacked_pubs = UnackedPubs1, + awaiting_seqno = SeqNo1 }. + +adapter_info(Sock, ProtoName) -> + amqp_connection:socket_adapter_info(Sock, {ProtoName, "N/A"}). + +set_proto_version(AdapterInfo = #amqp_adapter_info{protocol = {Proto, _}}, Vsn) -> + AdapterInfo#amqp_adapter_info{protocol = {Proto, + human_readable_mqtt_version(Vsn)}}. + +human_readable_mqtt_version(3) -> + "3.1.0"; +human_readable_mqtt_version(4) -> + "3.1.1"; +human_readable_mqtt_version(_) -> + "N/A". + +serialise_and_send_to_client(Frame, #proc_state{ socket = Sock }) -> + try rabbit_net:port_command(Sock, rabbit_mqtt_frame:serialise(Frame)) of + Res -> + Res + catch _:Error -> + rabbit_log_connection:error("MQTT: a socket write failed, the socket might already be closed"), + rabbit_log_connection:debug("Failed to write to socket ~p, error: ~p, frame: ~p", + [Sock, Error, Frame]) + end. + +close_connection(PState = #proc_state{ connection = undefined }) -> + PState; +close_connection(PState = #proc_state{ connection = Connection, + client_id = ClientId }) -> + % todo: maybe clean session + case ClientId of + undefined -> ok; + _ -> + case rabbit_mqtt_collector:unregister(ClientId, self()) of + ok -> ok; + %% ignore as we are shutting down + {timeout, _} -> ok + end + end, + %% ignore noproc or other exceptions, we are shutting down + catch amqp_connection:close(Connection), + PState #proc_state{ channels = {undefined, undefined}, + connection = undefined }. + +handle_pre_hibernate() -> + erase(topic_permission_cache), + ok. + +handle_ra_event({applied, [{Corr, ok}]}, + PState = #proc_state{register_state = {pending, Corr}}) -> + %% success case - command was applied transition into registered state + PState#proc_state{register_state = registered}; +handle_ra_event({not_leader, Leader, Corr}, + PState = #proc_state{register_state = {pending, Corr}, + client_id = ClientId}) -> + %% retry command against actual leader + {ok, NewCorr} = rabbit_mqtt_collector:register(Leader, ClientId, self()), + PState#proc_state{register_state = {pending, NewCorr}}; +handle_ra_event(register_timeout, + PState = #proc_state{register_state = {pending, _Corr}, + client_id = ClientId}) -> + {ok, NewCorr} = rabbit_mqtt_collector:register(ClientId, self()), + PState#proc_state{register_state = {pending, NewCorr}}; +handle_ra_event(register_timeout, PState) -> + PState; +handle_ra_event(Evt, PState) -> + %% log these? + rabbit_log:debug("unhandled ra_event: ~w ~n", [Evt]), + PState. + +%% NB: check_*: MQTT spec says we should ack normally, ie pretend there +%% was no auth error, but here we are closing the connection with an error. This +%% is what happens anyway if there is an authorization failure at the AMQP 0-9-1 client level. + +check_publish(TopicName, Fn, PState) -> + case check_topic_access(TopicName, write, PState) of + ok -> Fn(); + _ -> {error, unauthorized, PState} + end. + +check_subscribe([], Fn, _) -> + Fn(); + +check_subscribe([#mqtt_topic{name = TopicName} | Topics], Fn, PState) -> + case check_topic_access(TopicName, read, PState) of + ok -> check_subscribe(Topics, Fn, PState); + _ -> {error, unauthorized, PState} + end. + +check_topic_access(TopicName, Access, + #proc_state{ + auth_state = #auth_state{user = User = #user{username = Username}, + vhost = VHost}, + exchange = Exchange, + client_id = ClientId, + mqtt2amqp_fun = Mqtt2AmqpFun }) -> + Cache = + case get(topic_permission_cache) of + undefined -> []; + Other -> Other + end, + + Key = {TopicName, Username, ClientId, VHost, Exchange, Access}, + case lists:member(Key, Cache) of + true -> + ok; + false -> + Resource = #resource{virtual_host = VHost, + kind = topic, + name = Exchange}, + + RoutingKey = Mqtt2AmqpFun(TopicName), + Context = #{routing_key => RoutingKey, + variable_map => #{ + <<"username">> => Username, + <<"vhost">> => VHost, + <<"client_id">> => rabbit_data_coercion:to_binary(ClientId) + } + }, + + try rabbit_access_control:check_topic_access(User, Resource, Access, Context) of + ok -> + CacheTail = lists:sublist(Cache, ?MAX_TOPIC_PERMISSION_CACHE_SIZE - 1), + put(topic_permission_cache, [Key | CacheTail]), + ok; + R -> + R + catch + _:{amqp_error, access_refused, Msg, _} -> + rabbit_log:error("operation resulted in an error (access_refused): ~p~n", [Msg]), + {error, access_refused}; + _:Error -> + rabbit_log:error("~p~n", [Error]), + {error, access_refused} + end + end. + +info(consumer_tags, #proc_state{consumer_tags = Val}) -> Val; +info(unacked_pubs, #proc_state{unacked_pubs = Val}) -> Val; +info(awaiting_ack, #proc_state{awaiting_ack = Val}) -> Val; +info(awaiting_seqno, #proc_state{awaiting_seqno = Val}) -> Val; +info(message_id, #proc_state{message_id = Val}) -> Val; +info(client_id, #proc_state{client_id = Val}) -> + rabbit_data_coercion:to_binary(Val); +info(clean_sess, #proc_state{clean_sess = Val}) -> Val; +info(will_msg, #proc_state{will_msg = Val}) -> Val; +info(channels, #proc_state{channels = Val}) -> Val; +info(exchange, #proc_state{exchange = Val}) -> Val; +info(adapter_info, #proc_state{adapter_info = Val}) -> Val; +info(ssl_login_name, #proc_state{ssl_login_name = Val}) -> Val; +info(retainer_pid, #proc_state{retainer_pid = Val}) -> Val; +info(user, #proc_state{auth_state = #auth_state{username = Val}}) -> Val; +info(vhost, #proc_state{auth_state = #auth_state{vhost = Val}}) -> Val; +info(host, #proc_state{adapter_info = #amqp_adapter_info{host = Val}}) -> Val; +info(port, #proc_state{adapter_info = #amqp_adapter_info{port = Val}}) -> Val; +info(peer_host, #proc_state{adapter_info = #amqp_adapter_info{peer_host = Val}}) -> Val; +info(peer_port, #proc_state{adapter_info = #amqp_adapter_info{peer_port = Val}}) -> Val; +info(protocol, #proc_state{adapter_info = #amqp_adapter_info{protocol = Val}}) -> + case Val of + {Proto, Version} -> {Proto, rabbit_data_coercion:to_binary(Version)}; + Other -> Other + end; +info(channels, PState) -> additional_info(channels, PState); +info(channel_max, PState) -> additional_info(channel_max, PState); +info(frame_max, PState) -> additional_info(frame_max, PState); +info(client_properties, PState) -> additional_info(client_properties, PState); +info(ssl, PState) -> additional_info(ssl, PState); +info(ssl_protocol, PState) -> additional_info(ssl_protocol, PState); +info(ssl_key_exchange, PState) -> additional_info(ssl_key_exchange, PState); +info(ssl_cipher, PState) -> additional_info(ssl_cipher, PState); +info(ssl_hash, PState) -> additional_info(ssl_hash, PState); +info(Other, _) -> throw({bad_argument, Other}). + + +additional_info(Key, + #proc_state{adapter_info = + #amqp_adapter_info{additional_info = AddInfo}}) -> + proplists:get_value(Key, AddInfo). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl new file mode 100644 index 0000000000..39c0761321 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_reader.erl @@ -0,0 +1,480 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_reader). + +%% Transitional step until we can require Erlang/OTP 21 and +%% use the now recommended try/catch syntax for obtaining the stack trace. +-compile(nowarn_deprecated_function). + +-behaviour(gen_server2). + +-export([start_link/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + code_change/3, terminate/2, handle_pre_hibernate/1]). + +-export([conserve_resources/3, start_keepalive/2, + close_connection/2]). + +-export([ssl_login_name/1]). +-export([info/2]). + +-include_lib("amqp_client/include/amqp_client.hrl"). +-include("rabbit_mqtt.hrl"). + +-define(SIMPLE_METRICS, [pid, recv_oct, send_oct, reductions]). +-define(OTHER_METRICS, [recv_cnt, send_cnt, send_pend, garbage_collection, state]). + +%%---------------------------------------------------------------------------- + +start_link(KeepaliveSup, Ref) -> + Pid = proc_lib:spawn_link(?MODULE, init, + [[KeepaliveSup, Ref]]), + + {ok, Pid}. + +conserve_resources(Pid, _, {_, Conserve, _}) -> + Pid ! {conserve_resources, Conserve}, + ok. + +info(Pid, InfoItems) -> + case InfoItems -- ?INFO_ITEMS of + [] -> gen_server2:call(Pid, {info, InfoItems}); + UnknownItems -> throw({bad_argument, UnknownItems}) + end. + +close_connection(Pid, Reason) -> + gen_server:cast(Pid, {close_connection, Reason}). + +%%---------------------------------------------------------------------------- + +init([KeepaliveSup, Ref]) -> + process_flag(trap_exit, true), + {ok, Sock} = rabbit_networking:handshake(Ref, + application:get_env(rabbitmq_mqtt, proxy_protocol, false)), + RealSocket = rabbit_net:unwrap_socket(Sock), + case rabbit_net:connection_string(Sock, inbound) of + {ok, ConnStr} -> + rabbit_log_connection:debug("MQTT accepting TCP connection ~p (~s)~n", [self(), ConnStr]), + rabbit_alarm:register( + self(), {?MODULE, conserve_resources, []}), + ProcessorState = rabbit_mqtt_processor:initial_state(Sock,ssl_login_name(RealSocket)), + gen_server2:enter_loop(?MODULE, [], + rabbit_event:init_stats_timer( + control_throttle( + #state{socket = RealSocket, + conn_name = ConnStr, + await_recv = false, + connection_state = running, + received_connect_frame = false, + keepalive = {none, none}, + keepalive_sup = KeepaliveSup, + conserve = false, + parse_state = rabbit_mqtt_frame:initial_state(), + proc_state = ProcessorState }), #state.stats_timer), + {backoff, 1000, 1000, 10000}); + {network_error, Reason} -> + rabbit_net:fast_close(RealSocket), + terminate({shutdown, Reason}, undefined); + {error, enotconn} -> + rabbit_net:fast_close(RealSocket), + terminate(shutdown, undefined); + {error, Reason} -> + rabbit_net:fast_close(RealSocket), + terminate({network_error, Reason}, undefined) + end. + +handle_call({info, InfoItems}, _From, State) -> + Infos = lists:map( + fun(InfoItem) -> + {InfoItem, info_internal(InfoItem, State)} + end, + InfoItems), + {reply, Infos, State}; + +handle_call(Msg, From, State) -> + {stop, {mqtt_unexpected_call, Msg, From}, State}. + +handle_cast(duplicate_id, + State = #state{ proc_state = PState, + conn_name = ConnName }) -> + rabbit_log_connection:warning("MQTT disconnecting client ~p with duplicate id '~s'~n", + [ConnName, rabbit_mqtt_processor:info(client_id, PState)]), + {stop, {shutdown, duplicate_id}, State}; + +handle_cast(decommission_node, + State = #state{ proc_state = PState, + conn_name = ConnName }) -> + rabbit_log_connection:warning("MQTT disconnecting client ~p with client ID '~s' as its node is about" + " to be decommissioned~n", + [ConnName, rabbit_mqtt_processor:info(client_id, PState)]), + {stop, {shutdown, decommission_node}, State}; + +handle_cast({close_connection, Reason}, + State = #state{conn_name = ConnName, proc_state = PState}) -> + rabbit_log_connection:warning("MQTT disconnecting client ~p with client ID '~s', reason: ~s", + [ConnName, rabbit_mqtt_processor:info(client_id, PState), Reason]), + {stop, {shutdown, server_initiated_close}, State}; + +handle_cast(Msg, State) -> + {stop, {mqtt_unexpected_cast, Msg}, State}. + +handle_info({#'basic.deliver'{}, #amqp_msg{}, _DeliveryCtx} = Delivery, + State = #state{ proc_state = ProcState }) -> + callback_reply(State, rabbit_mqtt_processor:amqp_callback(Delivery, + ProcState)); + +handle_info(#'basic.ack'{} = Ack, State = #state{ proc_state = ProcState }) -> + callback_reply(State, rabbit_mqtt_processor:amqp_callback(Ack, ProcState)); + +handle_info(#'basic.consume_ok'{}, State) -> + {noreply, State, hibernate}; + +handle_info(#'basic.cancel'{}, State) -> + {stop, {shutdown, subscription_cancelled}, State}; + +handle_info({'EXIT', _Conn, Reason}, State) -> + {stop, {connection_died, Reason}, State}; + +handle_info({Tag, Sock, Data}, + State = #state{ socket = Sock, connection_state = blocked }) + when Tag =:= tcp; Tag =:= ssl -> + {noreply, State#state{ deferred_recv = Data }, hibernate}; + +handle_info({Tag, Sock, Data}, + State = #state{ socket = Sock, connection_state = running }) + when Tag =:= tcp; Tag =:= ssl -> + process_received_bytes( + Data, control_throttle(State #state{ await_recv = false })); + +handle_info({Tag, Sock}, State = #state{socket = Sock}) + when Tag =:= tcp_closed; Tag =:= ssl_closed -> + network_error(closed, State); + +handle_info({Tag, Sock, Reason}, State = #state{socket = Sock}) + when Tag =:= tcp_error; Tag =:= ssl_error -> + network_error(Reason, State); + +handle_info({inet_reply, Sock, ok}, State = #state{socket = Sock}) -> + {noreply, State, hibernate}; + +handle_info({inet_reply, Sock, {error, Reason}}, State = #state{socket = Sock}) -> + network_error(Reason, State); + +handle_info({conserve_resources, Conserve}, State) -> + maybe_process_deferred_recv( + control_throttle(State #state{ conserve = Conserve })); + +handle_info({bump_credit, Msg}, State) -> + credit_flow:handle_bump_msg(Msg), + maybe_process_deferred_recv(control_throttle(State)); + +handle_info({start_keepalives, Keepalive}, + State = #state { keepalive_sup = KeepaliveSup, socket = Sock }) -> + %% Only the client has the responsibility for sending keepalives + SendFun = fun() -> ok end, + Parent = self(), + ReceiveFun = fun() -> Parent ! keepalive_timeout end, + Heartbeater = rabbit_heartbeat:start( + KeepaliveSup, Sock, 0, SendFun, Keepalive, ReceiveFun), + {noreply, State #state { keepalive = Heartbeater }}; + +handle_info(keepalive_timeout, State = #state {conn_name = ConnStr, + proc_state = PState}) -> + rabbit_log_connection:error("closing MQTT connection ~p (keepalive timeout)~n", [ConnStr]), + send_will_and_terminate(PState, {shutdown, keepalive_timeout}, State); + +handle_info(emit_stats, State) -> + {noreply, emit_stats(State), hibernate}; + +handle_info({ra_event, _From, Evt}, + #state{proc_state = PState} = State) -> + %% handle applied event to ensure registration command actually got applied + %% handle not_leader notification in case we send the command to a non-leader + PState1 = rabbit_mqtt_processor:handle_ra_event(Evt, PState), + {noreply, State#state{proc_state = PState1}, hibernate}; + +handle_info(Msg, State) -> + {stop, {mqtt_unexpected_msg, Msg}, State}. + +terminate(Reason, State) -> + maybe_emit_stats(State), + do_terminate(Reason, State). + +handle_pre_hibernate(State) -> + rabbit_mqtt_processor:handle_pre_hibernate(), + {hibernate, State}. + +do_terminate({network_error, {ssl_upgrade_error, closed}, ConnStr}, _State) -> + rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: connection closed~n", + [ConnStr]); + +do_terminate({network_error, + {ssl_upgrade_error, + {tls_alert, "handshake failure"}}, ConnStr}, _State) -> + log_tls_alert(handshake_failure, ConnStr); +do_terminate({network_error, + {ssl_upgrade_error, + {tls_alert, "unknown ca"}}, ConnStr}, _State) -> + log_tls_alert(unknown_ca, ConnStr); +do_terminate({network_error, + {ssl_upgrade_error, + {tls_alert, {Err, _}}}, ConnStr}, _State) -> + log_tls_alert(Err, ConnStr); +do_terminate({network_error, + {ssl_upgrade_error, + {tls_alert, Alert}}, ConnStr}, _State) -> + log_tls_alert(Alert, ConnStr); +do_terminate({network_error, {ssl_upgrade_error, Reason}, ConnStr}, _State) -> + rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: ~p~n", + [ConnStr, Reason]); + +do_terminate({network_error, Reason, ConnStr}, _State) -> + rabbit_log_connection:error("MQTT detected network error on ~s: ~p~n", + [ConnStr, Reason]); + +do_terminate({network_error, Reason}, _State) -> + rabbit_log_connection:error("MQTT detected network error: ~p~n", [Reason]); + +do_terminate(normal, #state{proc_state = ProcState, + conn_name = ConnName}) -> + rabbit_mqtt_processor:close_connection(ProcState), + rabbit_log_connection:info("closing MQTT connection ~p (~s)~n", [self(), ConnName]), + ok; + +do_terminate(_Reason, #state{proc_state = ProcState}) -> + rabbit_mqtt_processor:close_connection(ProcState), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +ssl_login_name(Sock) -> + case rabbit_net:peercert(Sock) of + {ok, C} -> case rabbit_ssl:peer_cert_auth_name(C) of + unsafe -> none; + not_found -> none; + Name -> Name + end; + {error, no_peercert} -> none; + nossl -> none + end. + +%%---------------------------------------------------------------------------- + +log_tls_alert(handshake_failure, ConnStr) -> + rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: handshake failure~n", + [ConnStr]); +log_tls_alert(unknown_ca, ConnStr) -> + rabbit_log_connection:error("MQTT detected TLS certificate verification error on ~s: alert 'unknown CA'~n", + [ConnStr]); +log_tls_alert(Alert, ConnStr) -> + rabbit_log_connection:error("MQTT detected TLS upgrade error on ~s: alert ~s~n", + [ConnStr, Alert]). + +log_new_connection(#state{conn_name = ConnStr, proc_state = PState}) -> + rabbit_log_connection:info("accepting MQTT connection ~p (~s, client id: ~s)~n", + [self(), ConnStr, rabbit_mqtt_processor:info(client_id, PState)]). + +process_received_bytes(<<>>, State = #state{proc_state = ProcState, + received_connect_frame = false}) -> + MqttConn = ProcState#proc_state.connection, + case MqttConn of + undefined -> ok; + _ -> log_new_connection(State) + end, + {noreply, ensure_stats_timer(State#state{ received_connect_frame = true }), hibernate}; +process_received_bytes(<<>>, State) -> + {noreply, ensure_stats_timer(State), hibernate}; +process_received_bytes(Bytes, + State = #state{ parse_state = ParseState, + proc_state = ProcState, + conn_name = ConnStr }) -> + case parse(Bytes, ParseState) of + {more, ParseState1} -> + {noreply, + ensure_stats_timer( State #state{ parse_state = ParseState1 }), + hibernate}; + {ok, Frame, Rest} -> + case rabbit_mqtt_processor:process_frame(Frame, ProcState) of + {ok, ProcState1, ConnPid} -> + PS = rabbit_mqtt_frame:initial_state(), + process_received_bytes( + Rest, + State #state{ parse_state = PS, + proc_state = ProcState1, + connection = ConnPid }); + %% PUBLISH and more + {error, unauthorized = Reason, ProcState1} -> + rabbit_log_connection:error("MQTT connection ~s is closing due to an authorization failure~n", [ConnStr]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + %% CONNECT frames only + {error, unauthenticated = Reason, ProcState1} -> + rabbit_log_connection:error("MQTT connection ~s is closing due to an authentication failure~n", [ConnStr]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + %% CONNECT frames only + {error, invalid_client_id = Reason, ProcState1} -> + rabbit_log_connection:error("MQTT cannot accept connection ~s: client uses an invalid ID~n", [ConnStr]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + %% CONNECT frames only + {error, unsupported_protocol_version = Reason, ProcState1} -> + rabbit_log_connection:error("MQTT cannot accept connection ~s: incompatible protocol version~n", [ConnStr]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + {error, unavailable = Reason, ProcState1} -> + rabbit_log_connection:error("MQTT cannot accept connection ~s due to an internal error or unavailable component~n", + [ConnStr]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + {error, Reason, ProcState1} -> + rabbit_log_connection:error("MQTT protocol error on connection ~s: ~p~n", + [ConnStr, Reason]), + {stop, {shutdown, Reason}, pstate(State, ProcState1)}; + {error, Error} -> + rabbit_log_connection:error("MQTT detected a framing error on connection ~s: ~p~n", + [ConnStr, Error]), + {stop, {shutdown, Error}, State}; + {stop, ProcState1} -> + {stop, normal, pstate(State, ProcState1)} + end; + {error, {cannot_parse, Error, Stacktrace}} -> + rabbit_log_connection:error("MQTT cannot parse a frame on connection '~s', unparseable payload: ~p, error: {~p, ~p} ~n", + [ConnStr, Bytes, Error, Stacktrace]), + {stop, {shutdown, Error}, State}; + {error, Error} -> + rabbit_log_connection:error("MQTT detected a framing error on connection ~s: ~p~n", + [ConnStr, Error]), + {stop, {shutdown, Error}, State} + end. + +callback_reply(State, {ok, ProcState}) -> + {noreply, pstate(State, ProcState), hibernate}; +callback_reply(State, {error, Reason, ProcState}) -> + {stop, Reason, pstate(State, ProcState)}. + +start_keepalive(_, 0 ) -> ok; +start_keepalive(Pid, Keepalive) -> Pid ! {start_keepalives, Keepalive}. + +pstate(State = #state {}, PState = #proc_state{}) -> + State #state{ proc_state = PState }. + +%%---------------------------------------------------------------------------- +parse(Bytes, ParseState) -> + try + rabbit_mqtt_frame:parse(Bytes, ParseState) + catch + _:Reason:Stacktrace -> + {error, {cannot_parse, Reason, Stacktrace}} + end. + +send_will_and_terminate(PState, State) -> + send_will_and_terminate(PState, {shutdown, conn_closed}, State). + +send_will_and_terminate(PState, Reason, State = #state{conn_name = ConnStr}) -> + rabbit_mqtt_processor:send_will(PState), + rabbit_log_connection:debug("MQTT: about to send will message (if any) on connection ~p", [ConnStr]), + % todo: flush channel after publish + {stop, Reason, State}. + +network_error(closed, + State = #state{conn_name = ConnStr, + proc_state = PState}) -> + MqttConn = PState#proc_state.connection, + Fmt = "MQTT connection ~p will terminate because peer closed TCP connection~n", + Args = [ConnStr], + case MqttConn of + undefined -> rabbit_log_connection:debug(Fmt, Args); + _ -> rabbit_log_connection:info(Fmt, Args) + end, + send_will_and_terminate(PState, State); + +network_error(Reason, + State = #state{conn_name = ConnStr, + proc_state = PState}) -> + rabbit_log_connection:info("MQTT detected network error for ~p: ~p~n", + [ConnStr, Reason]), + send_will_and_terminate(PState, State). + +run_socket(State = #state{ connection_state = blocked }) -> + State; +run_socket(State = #state{ deferred_recv = Data }) when Data =/= undefined -> + State; +run_socket(State = #state{ await_recv = true }) -> + State; +run_socket(State = #state{ socket = Sock }) -> + rabbit_net:setopts(Sock, [{active, once}]), + State#state{ await_recv = true }. + +control_throttle(State = #state{ connection_state = Flow, + conserve = Conserve }) -> + case {Flow, Conserve orelse credit_flow:blocked()} of + {running, true} -> ok = rabbit_heartbeat:pause_monitor( + State#state.keepalive), + State #state{ connection_state = blocked }; + {blocked, false} -> ok = rabbit_heartbeat:resume_monitor( + State#state.keepalive), + run_socket(State #state{ + connection_state = running }); + {_, _} -> run_socket(State) + end. + +maybe_process_deferred_recv(State = #state{ deferred_recv = undefined }) -> + {noreply, State, hibernate}; +maybe_process_deferred_recv(State = #state{ deferred_recv = Data, socket = Sock }) -> + handle_info({tcp, Sock, Data}, + State#state{ deferred_recv = undefined }). + +maybe_emit_stats(undefined) -> + ok; +maybe_emit_stats(State) -> + rabbit_event:if_enabled(State, #state.stats_timer, + fun() -> emit_stats(State) end). + +emit_stats(State=#state{connection = C}) when C == none; C == undefined -> + %% Avoid emitting stats on terminate when the connection has not yet been + %% established, as this causes orphan entries on the stats database + State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), + ensure_stats_timer(State1); +emit_stats(State) -> + [{_, Pid}, {_, Recv_oct}, {_, Send_oct}, {_, Reductions}] = I + = infos(?SIMPLE_METRICS, State), + Infos = infos(?OTHER_METRICS, State), + rabbit_core_metrics:connection_stats(Pid, Infos), + rabbit_core_metrics:connection_stats(Pid, Recv_oct, Send_oct, Reductions), + rabbit_event:notify(connection_stats, Infos ++ I), + State1 = rabbit_event:reset_stats_timer(State, #state.stats_timer), + ensure_stats_timer(State1). + +ensure_stats_timer(State = #state{}) -> + rabbit_event:ensure_stats_timer(State, #state.stats_timer, emit_stats). + +infos(Items, State) -> [{Item, info_internal(Item, State)} || Item <- Items]. + +info_internal(pid, State) -> info_internal(connection, State); +info_internal(SockStat, #state{socket = Sock}) when SockStat =:= recv_oct; + SockStat =:= recv_cnt; + SockStat =:= send_oct; + SockStat =:= send_cnt; + SockStat =:= send_pend -> + case rabbit_net:getstat(Sock, [SockStat]) of + {ok, [{_, N}]} when is_number(N) -> N; + _ -> 0 + end; +info_internal(state, State) -> info_internal(connection_state, State); +info_internal(garbage_collection, _State) -> + rabbit_misc:get_gc_info(self()); +info_internal(reductions, _State) -> + {reductions, Reductions} = erlang:process_info(self(), reductions), + Reductions; +info_internal(conn_name, #state{conn_name = Val}) -> + rabbit_data_coercion:to_binary(Val); +info_internal(connection_state, #state{received_connect_frame = false}) -> + starting; +info_internal(connection_state, #state{connection_state = Val}) -> + Val; +info_internal(connection, #state{connection = Val}) -> + Val; +info_internal(Key, #state{proc_state = ProcState}) -> + rabbit_mqtt_processor:info(Key, ProcState). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl new file mode 100644 index 0000000000..4b3ee95743 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store.erl @@ -0,0 +1,23 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retained_msg_store). + +-export([behaviour_info/1, table_name_for/1]). + +behaviour_info(callbacks) -> + [{new, 2}, + {recover, 2}, + {insert, 3}, + {lookup, 2}, + {delete, 2}, + {terminate, 1}]; +behaviour_info(_Other) -> + undefined. + +table_name_for(VHost) -> + rabbit_mqtt_util:vhost_name_to_table_name(VHost). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl new file mode 100644 index 0000000000..03c5942d35 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_dets.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retained_msg_store_dets). + +-behaviour(rabbit_mqtt_retained_msg_store). +-include("rabbit_mqtt.hrl"). + +-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). + +-record(store_state, { + %% DETS table name + table +}). + + +new(Dir, VHost) -> + Tid = open_table(Dir, VHost), + #store_state{table = Tid}. + +recover(Dir, VHost) -> + case open_table(Dir, VHost) of + {error, _} -> {error, uninitialized}; + {ok, Tid} -> {ok, #store_state{table = Tid}} + end. + +insert(Topic, Msg, #store_state{table = T}) -> + ok = dets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}). + +lookup(Topic, #store_state{table = T}) -> + case dets:lookup(T, Topic) of + [] -> not_found; + [Entry] -> Entry + end. + +delete(Topic, #store_state{table = T}) -> + ok = dets:delete(T, Topic). + +terminate(#store_state{table = T}) -> + ok = dets:close(T). + +open_table(Dir, VHost) -> + dets:open_file(rabbit_mqtt_retained_msg_store:table_name_for(VHost), + table_options(rabbit_mqtt_util:path_for(Dir, VHost, ".dets"))). + +table_options(Path) -> + [{type, set}, {keypos, #retained_message.topic}, + {file, Path}, {ram_file, true}, {repair, true}, + {auto_save, rabbit_misc:get_env(rabbit_mqtt, + retained_message_store_dets_sync_interval, 2000)}]. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl new file mode 100644 index 0000000000..9080a6f4cf --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_ets.erl @@ -0,0 +1,54 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retained_msg_store_ets). + +-behaviour(rabbit_mqtt_retained_msg_store). +-include("rabbit_mqtt.hrl"). + +-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). + +-record(store_state, { + %% ETS table ID + table, + %% where the table is stored on disk + filename +}). + + +new(Dir, VHost) -> + Path = rabbit_mqtt_util:path_for(Dir, VHost), + TableName = rabbit_mqtt_retained_msg_store:table_name_for(VHost), + file:delete(Path), + Tid = ets:new(TableName, [set, public, {keypos, #retained_message.topic}]), + #store_state{table = Tid, filename = Path}. + +recover(Dir, VHost) -> + Path = rabbit_mqtt_util:path_for(Dir, VHost), + case ets:file2tab(Path) of + {ok, Tid} -> file:delete(Path), + {ok, #store_state{table = Tid, filename = Path}}; + {error, _} -> {error, uninitialized} + end. + +insert(Topic, Msg, #store_state{table = T}) -> + true = ets:insert(T, #retained_message{topic = Topic, mqtt_msg = Msg}), + ok. + +lookup(Topic, #store_state{table = T}) -> + case ets:lookup(T, Topic) of + [] -> not_found; + [Entry] -> Entry + end. + +delete(Topic, #store_state{table = T}) -> + true = ets:delete(T, Topic), + ok. + +terminate(#store_state{table = T, filename = Path}) -> + ok = ets:tab2file(T, Path, + [{extended_info, [object_count]}]). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl new file mode 100644 index 0000000000..382ffbc63d --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retained_msg_store_noop.erl @@ -0,0 +1,31 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retained_msg_store_noop). + +-behaviour(rabbit_mqtt_retained_msg_store). +-include("rabbit_mqtt.hrl"). + +-export([new/2, recover/2, insert/3, lookup/2, delete/2, terminate/1]). + +new(_Dir, _VHost) -> + ok. + +recover(_Dir, _VHost) -> + {ok, ok}. + +insert(_Topic, _Msg, _State) -> + ok. + +lookup(_Topic, _State) -> + not_found. + +delete(_Topic, _State) -> + ok. + +terminate(_State) -> + ok. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl new file mode 100644 index 0000000000..2aa873ecfb --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer.erl @@ -0,0 +1,98 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retainer). + +-behaviour(gen_server2). +-include("rabbit_mqtt.hrl"). +-include("rabbit_mqtt_frame.hrl"). + +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, + terminate/2, code_change/3, start_link/2]). + +-export([retain/3, fetch/2, clear/2, store_module/0]). + +-define(SERVER, ?MODULE). +-define(TIMEOUT, 30000). + +-record(retainer_state, {store_mod, + store}). + +-spec retain(pid(), string(), mqtt_msg()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate} | + {stop, Reason :: term(), NewState :: term()}. + +%%---------------------------------------------------------------------------- + +start_link(RetainStoreMod, VHost) -> + gen_server2:start_link(?MODULE, [RetainStoreMod, VHost], []). + +retain(Pid, Topic, Msg = #mqtt_msg{retain = true}) -> + gen_server2:cast(Pid, {retain, Topic, Msg}); + +retain(_Pid, _Topic, Msg = #mqtt_msg{retain = false}) -> + throw({error, {retain_is_false, Msg}}). + +fetch(Pid, Topic) -> + gen_server2:call(Pid, {fetch, Topic}, ?TIMEOUT). + +clear(Pid, Topic) -> + gen_server2:cast(Pid, {clear, Topic}). + +%%---------------------------------------------------------------------------- + +init([StoreMod, VHost]) -> + process_flag(trap_exit, true), + State = case StoreMod:recover(store_dir(), VHost) of + {ok, Store} -> #retainer_state{store = Store, + store_mod = StoreMod}; + {error, _} -> #retainer_state{store = StoreMod:new(store_dir(), VHost), + store_mod = StoreMod} + end, + {ok, State}. + +store_module() -> + case application:get_env(rabbitmq_mqtt, retained_message_store) of + {ok, Mod} -> Mod; + undefined -> undefined + end. + +%%---------------------------------------------------------------------------- + +handle_cast({retain, Topic, Msg}, + State = #retainer_state{store = Store, store_mod = Mod}) -> + ok = Mod:insert(Topic, Msg, Store), + {noreply, State}; +handle_cast({clear, Topic}, + State = #retainer_state{store = Store, store_mod = Mod}) -> + ok = Mod:delete(Topic, Store), + {noreply, State}. + +handle_call({fetch, Topic}, _From, + State = #retainer_state{store = Store, store_mod = Mod}) -> + Reply = case Mod:lookup(Topic, Store) of + #retained_message{mqtt_msg = Msg} -> Msg; + not_found -> undefined + end, + {reply, Reply, State}. + +handle_info(stop, State) -> + {stop, normal, State}; + +handle_info(Info, State) -> + {stop, {unknown_info, Info}, State}. + +store_dir() -> + rabbit_mnesia:dir(). + +terminate(_Reason, #retainer_state{store = Store, store_mod = Mod}) -> + Mod:terminate(Store), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl new file mode 100644 index 0000000000..86b54ce3d7 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_retainer_sup.erl @@ -0,0 +1,60 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_retainer_sup). +-behaviour(supervisor2). + +-export([start_link/1, init/1, start_child/2,start_child/1, child_for_vhost/1, + delete_child/1]). + +-define(ENCODING, utf8). + +-spec start_child(binary()) -> supervisor2:startchild_ret(). +-spec start_child(term(), binary()) -> supervisor2:startchild_ret(). + +start_link(SupName) -> + supervisor2:start_link(SupName, ?MODULE, []). + +child_for_vhost(VHost) when is_binary(VHost) -> + case rabbit_mqtt_retainer_sup:start_child(VHost) of + {ok, Pid} -> Pid; + {error, {already_started, Pid}} -> Pid + end. + +start_child(VHost) when is_binary(VHost) -> + start_child(rabbit_mqtt_retainer:store_module(), VHost). + +start_child(RetainStoreMod, VHost) -> + supervisor2:start_child(?MODULE, + + {vhost_to_atom(VHost), + {rabbit_mqtt_retainer, start_link, [RetainStoreMod, VHost]}, + permanent, 60, worker, [rabbit_mqtt_retainer]}). + +delete_child(VHost) -> + Id = vhost_to_atom(VHost), + ok = supervisor2:terminate_child(?MODULE, Id), + ok = supervisor2:delete_child(?MODULE, Id). + +init([]) -> + Mod = rabbit_mqtt_retainer:store_module(), + rabbit_log:info("MQTT retained message store: ~p~n", + [Mod]), + {ok, {{one_for_one, 5, 5}, child_specs(Mod, rabbit_vhost:list_names())}}. + +child_specs(Mod, VHosts) -> + %% see start_child/2 + [{vhost_to_atom(V), + {rabbit_mqtt_retainer, start_link, [Mod, V]}, + permanent, infinity, worker, [rabbit_mqtt_retainer]} || V <- VHosts]. + +vhost_to_atom(VHost) -> + %% we'd like to avoid any conversion here because + %% this atom isn't meant to be human-readable, only + %% unique. This makes sure we don't get noisy process restarts + %% with really unusual vhost names used by various HTTP API test suites + rabbit_data_coercion:to_atom(VHost, latin1). diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl new file mode 100644 index 0000000000..c00be457d3 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_sup.erl @@ -0,0 +1,73 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_sup). +-behaviour(supervisor2). + +-include_lib("rabbit_common/include/rabbit.hrl"). + +-export([start_link/2, init/1, stop_listeners/0]). + +-define(TCP_PROTOCOL, 'mqtt'). +-define(TLS_PROTOCOL, 'mqtt/ssl'). + +start_link(Listeners, []) -> + supervisor2:start_link({local, ?MODULE}, ?MODULE, [Listeners]). + +init([{Listeners, SslListeners0}]) -> + NumTcpAcceptors = application:get_env(rabbitmq_mqtt, num_tcp_acceptors, 10), + {ok, SocketOpts} = application:get_env(rabbitmq_mqtt, tcp_listen_options), + {SslOpts, NumSslAcceptors, SslListeners} + = case SslListeners0 of + [] -> {none, 0, []}; + _ -> {rabbit_networking:ensure_ssl(), + application:get_env(rabbitmq_mqtt, num_ssl_acceptors, 10), + case rabbit_networking:poodle_check('MQTT') of + ok -> SslListeners0; + danger -> [] + end} + end, + {ok, {{one_for_all, 10, 10}, + [{rabbit_mqtt_retainer_sup, + {rabbit_mqtt_retainer_sup, start_link, [{local, rabbit_mqtt_retainer_sup}]}, + transient, ?SUPERVISOR_WAIT, supervisor, [rabbit_mqtt_retainer_sup]} | + listener_specs(fun tcp_listener_spec/1, + [SocketOpts, NumTcpAcceptors], Listeners) ++ + listener_specs(fun ssl_listener_spec/1, + [SocketOpts, SslOpts, NumSslAcceptors], SslListeners)]}}. + +stop_listeners() -> + rabbit_networking:stop_ranch_listener_of_protocol(?TCP_PROTOCOL), + rabbit_networking:stop_ranch_listener_of_protocol(?TLS_PROTOCOL), + ok. + +%% +%% Implementation +%% + +listener_specs(Fun, Args, Listeners) -> + [Fun([Address | Args]) || + Listener <- Listeners, + Address <- rabbit_networking:tcp_listener_addresses(Listener)]. + +tcp_listener_spec([Address, SocketOpts, NumAcceptors]) -> + rabbit_networking:tcp_listener_spec( + rabbit_mqtt_listener_sup, Address, SocketOpts, + transport(?TCP_PROTOCOL), rabbit_mqtt_connection_sup, [], + mqtt, NumAcceptors, "MQTT TCP listener"). + +ssl_listener_spec([Address, SocketOpts, SslOpts, NumAcceptors]) -> + rabbit_networking:tcp_listener_spec( + rabbit_mqtt_listener_sup, Address, SocketOpts ++ SslOpts, + transport(?TLS_PROTOCOL), rabbit_mqtt_connection_sup, [], + 'mqtt/ssl', NumAcceptors, "MQTT TLS listener"). + +transport(Protocol) -> + case Protocol of + ?TCP_PROTOCOL -> ranch_tcp; + ?TLS_PROTOCOL -> ranch_ssl + end. diff --git a/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl new file mode 100644 index 0000000000..0fbe7e8a85 --- /dev/null +++ b/deps/rabbitmq_mqtt/src/rabbit_mqtt_util.erl @@ -0,0 +1,139 @@ +%% This Source Code Form is subject to the terms of the Mozilla Public +%% License, v. 2.0. If a copy of the MPL was not distributed with this +%% file, You can obtain one at https://mozilla.org/MPL/2.0/. +%% +%% Copyright (c) 2007-2020 VMware, Inc. or its affiliates. All rights reserved. +%% + +-module(rabbit_mqtt_util). + +-include("rabbit_mqtt.hrl"). + +-export([subcription_queue_name/1, + gen_client_id/0, + env/1, + table_lookup/2, + path_for/2, + path_for/3, + vhost_name_to_table_name/1, + get_topic_translation_funs/0 + ]). + +-define(MAX_TOPIC_TRANSLATION_CACHE_SIZE, 12). + +subcription_queue_name(ClientId) -> + Base = "mqtt-subscription-" ++ ClientId ++ "qos", + {list_to_binary(Base ++ "0"), list_to_binary(Base ++ "1")}. + +cached(CacheName, Fun, Arg) -> + Cache = + case get(CacheName) of + undefined -> + []; + Other -> + Other + end, + case lists:keyfind(Arg, 1, Cache) of + {_, V} -> + V; + false -> + V = Fun(Arg), + CacheTail = lists:sublist(Cache, ?MAX_TOPIC_TRANSLATION_CACHE_SIZE - 1), + put(CacheName, [{Arg, V} | CacheTail]), + V + end. + +to_amqp(T0) -> + T1 = string:replace(T0, "/", ".", all), + T2 = string:replace(T1, "+", "*", all), + erlang:iolist_to_binary(T2). + +to_mqtt(T0) -> + T1 = string:replace(T0, "*", "+", all), + T2 = string:replace(T1, ".", "/", all), + erlang:iolist_to_binary(T2). + +%% amqp mqtt descr +%% * + match one topic level +%% # # match multiple topic levels +%% . / topic level separator +get_topic_translation_funs() -> + SparkplugB = env(sparkplug), + ToAmqpFun = fun(Topic) -> + cached(mta_cache, fun to_amqp/1, Topic) + end, + ToMqttFun = fun(Topic) -> + cached(atm_cache, fun to_mqtt/1, Topic) + end, + {M2AFun, A2MFun} = case SparkplugB of + true -> + {ok, M2A_SpRe} = re:compile("^sp[AB]v\\d+\\.\\d+/"), + {ok, A2M_SpRe} = re:compile("^sp[AB]v\\d+___\\d+\\."), + M2A = fun(T0) -> + case re:run(T0, M2A_SpRe) of + nomatch -> + ToAmqpFun(T0); + {match, _} -> + T1 = string:replace(T0, ".", "___", leading), + ToAmqpFun(T1) + end + end, + A2M = fun(T0) -> + case re:run(T0, A2M_SpRe) of + nomatch -> + ToMqttFun(T0); + {match, _} -> + T1 = ToMqttFun(T0), + T2 = string:replace(T1, "___", ".", leading), + erlang:iolist_to_binary(T2) + end + end, + {M2A, A2M}; + _ -> + M2A = fun(T) -> + ToAmqpFun(T) + end, + A2M = fun(T) -> + ToMqttFun(T) + end, + {M2A, A2M} + end, + {ok, {mqtt2amqp_fun, M2AFun}, {amqp2mqtt_fun, A2MFun}}. + +gen_client_id() -> + lists:nthtail(1, rabbit_guid:string(rabbit_guid:gen_secure(), [])). + +env(Key) -> + case application:get_env(rabbitmq_mqtt, Key) of + {ok, Val} -> coerce_env_value(Key, Val); + undefined -> undefined + end. + +%% TODO: move to rabbit_common +coerce_env_value(default_pass, Val) -> rabbit_data_coercion:to_binary(Val); +coerce_env_value(default_user, Val) -> rabbit_data_coercion:to_binary(Val); +coerce_env_value(exchange, Val) -> rabbit_data_coercion:to_binary(Val); +coerce_env_value(vhost, Val) -> rabbit_data_coercion:to_binary(Val); +coerce_env_value(_, Val) -> Val. + +table_lookup(undefined, _Key) -> + undefined; +table_lookup(Table, Key) -> + rabbit_misc:table_lookup(Table, Key). + +vhost_name_to_dir_name(VHost) -> + vhost_name_to_dir_name(VHost, ".ets"). +vhost_name_to_dir_name(VHost, Suffix) -> + <<Num:128>> = erlang:md5(VHost), + "mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num]) ++ Suffix. + +path_for(Dir, VHost) -> + filename:join(Dir, vhost_name_to_dir_name(VHost)). + +path_for(Dir, VHost, Suffix) -> + filename:join(Dir, vhost_name_to_dir_name(VHost, Suffix)). + + +vhost_name_to_table_name(VHost) -> + <<Num:128>> = erlang:md5(VHost), + list_to_atom("rabbit_mqtt_retained_" ++ rabbit_misc:format("~36.16.0b", [Num])). |