diff options
-rw-r--r-- | deps/rabbitmq_mqtt/src/mqtt_machine.erl | 26 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl | 28 |
2 files changed, 44 insertions, 10 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl index f9aebc3b4f..0da3d679ce 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -47,12 +47,25 @@ apply(_Meta, {register, ClientId, Pid}, {monitor, process, Pid}, {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}], - {Effects0, maps:remove(ClientId, Ids), Pids0}; - _ -> - Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, + Pids2 = case maps:take(OldPid, Pids0) of + error -> + Pids0; + {[ClientId], Pids1} -> + Pids1; + {ClientIds, Pids1} -> + Pids1#{ClientId => lists:delete(ClientId, ClientIds)} + end, + Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, + [ClientId], Pids2), + {Effects0, maps:remove(ClientId, Ids), Pids3}; + + {ok, Pid} -> + {[], Ids, Pids0}; + error -> + Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, [ClientId], Pids0), - Effects0 = [{monitor, process, Pid}], - {Effects0, Ids, Pids1} + Effects0 = [{monitor, process, Pid}], + {Effects0, Ids, Pids1} end, State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1), pids = Pids}, @@ -148,7 +161,8 @@ apply(Meta, {leave, Node}, #machine_state{client_ids = Ids, apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) -> Pids = maps:fold( fun(Id, Pid, Acc) -> - maps:update_with(Pid, fun(CIds) -> [Id | CIds] end, + maps:update_with(Pid, + fun(CIds) -> [Id | CIds] end, [Id], Acc) end, #{}, Ids), {#machine_state{client_ids = Ids, diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl index e82c50ef17..7ce08cbc2c 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl @@ -22,6 +22,7 @@ all() -> all_tests() -> [ basics, + machine_upgrade, many_downs ]. @@ -55,18 +56,37 @@ end_per_testcase(_TestCase, _Config) -> basics(_Config) -> S0 = mqtt_machine:init(#{}), ClientId = <<"id1">>, + OthPid = spawn(fun () -> ok end), {S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1), ?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1), - {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1), - ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2), - {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), + {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1), + ?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids} + when map_size(Ids) == 1, S2), + {S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3), - {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2), + {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4), ok. +machine_upgrade(_Config) -> + S0 = mqtt_machine_v0:init(#{}), + ClientId = <<"id1">>, + Self = self(), + {S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0), + ?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1), + {S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1), + ?assertMatch(#machine_state{client_ids = #{ClientId := Self}, + pids = #{Self := [ClientId]} = Pids} + when map_size(Pids) == 1, S2), + {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), + ?assertMatch(#machine_state{client_ids = Ids, + pids = Pids} + when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3), + + ok. + many_downs(_Config) -> S0 = mqtt_machine:init(#{}), Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)} |