diff options
author | kjnilsson <knilsson@pivotal.io> | 2020-12-22 15:16:17 +0000 |
---|---|---|
committer | kjnilsson <knilsson@pivotal.io> | 2020-12-22 15:16:17 +0000 |
commit | 04a55e0ee685e0a3d4b303bbc07ba934c3714204 (patch) | |
tree | ca69757e5544b9ab4b27a17256238ba435ef010f | |
parent | 160e41687de73f44f6be4f1d46d27af413b3597c (diff) | |
download | rabbitmq-server-git-mqtt-machine-opt.tar.gz |
bug fixesmqtt-machine-opt
-rw-r--r-- | deps/rabbitmq_mqtt/src/mqtt_machine.erl | 26 | ||||
-rw-r--r-- | deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl | 28 |
2 files changed, 44 insertions, 10 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl index f9aebc3b4f..0da3d679ce 100644 --- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl +++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl @@ -47,12 +47,25 @@ apply(_Meta, {register, ClientId, Pid}, {monitor, process, Pid}, {mod_call, ?MODULE, notify_connection, [OldPid, duplicate_id]}], - {Effects0, maps:remove(ClientId, Ids), Pids0}; - _ -> - Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, + Pids2 = case maps:take(OldPid, Pids0) of + error -> + Pids0; + {[ClientId], Pids1} -> + Pids1; + {ClientIds, Pids1} -> + Pids1#{ClientId => lists:delete(ClientId, ClientIds)} + end, + Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, + [ClientId], Pids2), + {Effects0, maps:remove(ClientId, Ids), Pids3}; + + {ok, Pid} -> + {[], Ids, Pids0}; + error -> + Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end, [ClientId], Pids0), - Effects0 = [{monitor, process, Pid}], - {Effects0, Ids, Pids1} + Effects0 = [{monitor, process, Pid}], + {Effects0, Ids, Pids1} end, State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1), pids = Pids}, @@ -148,7 +161,8 @@ apply(Meta, {leave, Node}, #machine_state{client_ids = Ids, apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) -> Pids = maps:fold( fun(Id, Pid, Acc) -> - maps:update_with(Pid, fun(CIds) -> [Id | CIds] end, + maps:update_with(Pid, + fun(CIds) -> [Id | CIds] end, [Id], Acc) end, #{}, Ids), {#machine_state{client_ids = Ids, diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl index e82c50ef17..7ce08cbc2c 100644 --- a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl @@ -22,6 +22,7 @@ all() -> all_tests() -> [ basics, + machine_upgrade, many_downs ]. @@ -55,18 +56,37 @@ end_per_testcase(_TestCase, _Config) -> basics(_Config) -> S0 = mqtt_machine:init(#{}), ClientId = <<"id1">>, + OthPid = spawn(fun () -> ok end), {S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1), ?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1), - {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1), - ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2), - {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), + {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1), + ?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids} + when map_size(Ids) == 1, S2), + {S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3), - {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2), + {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2), ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4), ok. +machine_upgrade(_Config) -> + S0 = mqtt_machine_v0:init(#{}), + ClientId = <<"id1">>, + Self = self(), + {S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0), + ?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1), + {S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1), + ?assertMatch(#machine_state{client_ids = #{ClientId := Self}, + pids = #{Self := [ClientId]} = Pids} + when map_size(Pids) == 1, S2), + {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2), + ?assertMatch(#machine_state{client_ids = Ids, + pids = Pids} + when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3), + + ok. + many_downs(_Config) -> S0 = mqtt_machine:init(#{}), Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)} |