diff options
| author | Daniil Fedotov <hairyhum@gmail.com> | 2019-06-04 16:59:24 -0400 |
|---|---|---|
| committer | Daniil Fedotov <hairyhum@gmail.com> | 2019-06-04 16:59:24 -0400 |
| commit | 35377fc69906b7a507d0161d37f4b778dded97b8 (patch) | |
| tree | de57a380813512baf8b4c5c07620e1895e54dcfb | |
| parent | e5fd6b64c2aa7f82cc0213bea18c909dfbd741d4 (diff) | |
| download | rabbitmq-server-git-35377fc69906b7a507d0161d37f4b778dded97b8.tar.gz | |
Switch msg_id_to_channel from gb_tree to maps.
| -rw-r--r-- | src/rabbit_amqqueue_process.erl | 22 | ||||
| -rw-r--r-- | src/rabbit_mirror_queue_slave.erl | 4 |
2 files changed, 12 insertions, 14 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl index fbab825c76..aa60503a70 100644 --- a/src/rabbit_amqqueue_process.erl +++ b/src/rabbit_amqqueue_process.erl @@ -163,7 +163,7 @@ init_state(Q) -> has_had_consumers = false, consumers = rabbit_queue_consumers:new(), senders = pmon:new(delegate), - msg_id_to_channel = gb_trees:empty(), + msg_id_to_channel = #{}, status = running, args_policy_version = 0, overflow = 'drop-head', @@ -261,7 +261,7 @@ recovery_barrier(BarrierPid) -> -spec init_with_backing_queue_state (amqqueue:amqqueue(), atom(), tuple(), any(), - [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) -> + [rabbit_types:delivery()], pmon:pmon(), maps:map()) -> #q{}. init_with_backing_queue_state(Q, BQ, BQS, @@ -599,13 +599,14 @@ confirm_messages(MsgIds, MTC) -> {CMs, MTC1} = lists:foldl( fun(MsgId, {CMs, MTC0}) -> - case gb_trees:lookup(MsgId, MTC0) of - {value, {SenderPid, MsgSeqNo}} -> + case maps:get(MsgId, MTC0, none) of + none -> + {CMs, MTC0}; + {SenderPid, MsgSeqNo} -> {rabbit_misc:gb_trees_cons(SenderPid, MsgSeqNo, CMs), - gb_trees:delete(MsgId, MTC0)}; - none -> - {CMs, MTC0} + maps:remove(MsgId, MTC0)} + end end, {gb_trees:empty(), MTC}, MsgIds), rabbit_misc:gb_trees_foreach(fun rabbit_misc:confirm_to_sender/2, CMs), @@ -622,7 +623,7 @@ send_or_record_confirm(#delivery{confirm = true, State = #q{q = Q, msg_id_to_channel = MTC}) when ?amqqueue_is_durable(Q) -> - MTC1 = gb_trees:insert(MsgId, {SenderPid, MsgSeqNo}, MTC), + MTC1 = maps:put(MsgId, {SenderPid, MsgSeqNo}, MTC), {eventually, State#q{msg_id_to_channel = MTC1}}; send_or_record_confirm(#delivery{confirm = true, sender = SenderPid, @@ -811,10 +812,7 @@ send_reject_publish(#delivery{confirm = true, msg_id_to_channel = MTC}) -> gen_server2:cast(SenderPid, {reject_publish, MsgSeqNo, self()}), - MTC1 = case gb_trees:is_defined(MsgId, MTC) of - true -> gb_trees:delete(MsgId, MTC); - false -> MTC - end, + MTC1 = maps:remove(MsgId, MTC), BQS1 = BQ:discard(MsgId, SenderPid, Flow, BQS), State#q{ backing_queue_state = BQS1, msg_id_to_channel = MTC1 }; send_reject_publish(#delivery{confirm = false}, diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl index f7a122f98a..22df1751e5 100644 --- a/src/rabbit_mirror_queue_slave.erl +++ b/src/rabbit_mirror_queue_slave.erl @@ -717,10 +717,10 @@ promote_me(From, #state { q = Q0, QName, CPid, BQ, BQS, GM, AckTags, SS, MPids), MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) -> - gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0); + maps:put(MsgId, {ChPid, MsgSeqNo}, MTC0); (_Msgid, _Status, MTC0) -> MTC0 - end, gb_trees:empty(), MS), + end, #{}, MS), Deliveries = [promote_delivery(Delivery) || {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ), Delivery <- queue:to_list(PubQ)], |
