summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--deps/rabbitmq_mqtt/src/mqtt_machine.erl26
-rw-r--r--deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl28
2 files changed, 44 insertions, 10 deletions
diff --git a/deps/rabbitmq_mqtt/src/mqtt_machine.erl b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
index f9aebc3b4f..0da3d679ce 100644
--- a/deps/rabbitmq_mqtt/src/mqtt_machine.erl
+++ b/deps/rabbitmq_mqtt/src/mqtt_machine.erl
@@ -47,12 +47,25 @@ apply(_Meta, {register, ClientId, Pid},
{monitor, process, Pid},
{mod_call, ?MODULE, notify_connection,
[OldPid, duplicate_id]}],
- {Effects0, maps:remove(ClientId, Ids), Pids0};
- _ ->
- Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
+ Pids2 = case maps:take(OldPid, Pids0) of
+ error ->
+ Pids0;
+ {[ClientId], Pids1} ->
+ Pids1;
+ {ClientIds, Pids1} ->
+ Pids1#{ClientId => lists:delete(ClientId, ClientIds)}
+ end,
+ Pids3 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
+ [ClientId], Pids2),
+ {Effects0, maps:remove(ClientId, Ids), Pids3};
+
+ {ok, Pid} ->
+ {[], Ids, Pids0};
+ error ->
+ Pids1 = maps:update_with(Pid, fun(CIds) -> [ClientId | CIds] end,
[ClientId], Pids0),
- Effects0 = [{monitor, process, Pid}],
- {Effects0, Ids, Pids1}
+ Effects0 = [{monitor, process, Pid}],
+ {Effects0, Ids, Pids1}
end,
State = State0#machine_state{client_ids = maps:put(ClientId, Pid, Ids1),
pids = Pids},
@@ -148,7 +161,8 @@ apply(Meta, {leave, Node}, #machine_state{client_ids = Ids,
apply(_Meta, {machine_version, 0, 1}, {machine_state, Ids}) ->
Pids = maps:fold(
fun(Id, Pid, Acc) ->
- maps:update_with(Pid, fun(CIds) -> [Id | CIds] end,
+ maps:update_with(Pid,
+ fun(CIds) -> [Id | CIds] end,
[Id], Acc)
end, #{}, Ids),
{#machine_state{client_ids = Ids,
diff --git a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl
index e82c50ef17..7ce08cbc2c 100644
--- a/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl
+++ b/deps/rabbitmq_mqtt/test/mqtt_machine_SUITE.erl
@@ -22,6 +22,7 @@ all() ->
all_tests() ->
[
basics,
+ machine_upgrade,
many_downs
].
@@ -55,18 +56,37 @@ end_per_testcase(_TestCase, _Config) ->
basics(_Config) ->
S0 = mqtt_machine:init(#{}),
ClientId = <<"id1">>,
+ OthPid = spawn(fun () -> ok end),
{S1, ok, _} = mqtt_machine:apply(meta(1), {register, ClientId, self()}, S0),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S1),
?assertMatch(#machine_state{pids = Pids} when map_size(Pids) == 1, S1),
- {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, self()}, S1),
- ?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 1, S2),
- {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
+ {S2, ok, _} = mqtt_machine:apply(meta(2), {register, ClientId, OthPid}, S1),
+ ?assertMatch(#machine_state{client_ids = #{ClientId := OthPid} = Ids}
+ when map_size(Ids) == 1, S2),
+ {S3, ok, _} = mqtt_machine:apply(meta(3), {down, OthPid, noproc}, S2),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S3),
- {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, self()}, S2),
+ {S4, ok, _} = mqtt_machine:apply(meta(3), {unregister, ClientId, OthPid}, S2),
?assertMatch(#machine_state{client_ids = Ids} when map_size(Ids) == 0, S4),
ok.
+machine_upgrade(_Config) ->
+ S0 = mqtt_machine_v0:init(#{}),
+ ClientId = <<"id1">>,
+ Self = self(),
+ {S1, ok, _} = mqtt_machine_v0:apply(meta(1), {register, ClientId, self()}, S0),
+ ?assertMatch({machine_state, Ids} when map_size(Ids) == 1, S1),
+ {S2, ok, _} = mqtt_machine:apply(meta(2), {machine_version, 0, 1}, S1),
+ ?assertMatch(#machine_state{client_ids = #{ClientId := Self},
+ pids = #{Self := [ClientId]} = Pids}
+ when map_size(Pids) == 1, S2),
+ {S3, ok, _} = mqtt_machine:apply(meta(3), {down, self(), noproc}, S2),
+ ?assertMatch(#machine_state{client_ids = Ids,
+ pids = Pids}
+ when map_size(Ids) == 0 andalso map_size(Pids) == 0, S3),
+
+ ok.
+
many_downs(_Config) ->
S0 = mqtt_machine:init(#{}),
Clients = [{list_to_binary(integer_to_list(I)), spawn(fun() -> ok end)}