diff options
| author | Daniil Fedotov <dfedotov@pivotal.io> | 2017-10-30 15:33:35 +0000 |
|---|---|---|
| committer | Daniil Fedotov <dfedotov@pivotal.io> | 2017-10-30 15:34:15 +0000 |
| commit | 3e6d1ad207b5dc5b5ab5fff39d0d82f0d45915c0 (patch) | |
| tree | c2a571d523de15257c6035bc925eb56023557c46 | |
| parent | c0dcb2b3ee4386258a8e3e278ebdd7a95b2fde2f (diff) | |
| download | rabbitmq-server-git-3e6d1ad207b5dc5b5ab5fff39d0d82f0d45915c0.tar.gz | |
Replace orddicts with maps
| -rw-r--r-- | src/gm.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_limiter.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_msg_store.erl | 24 | ||||
| -rw-r--r-- | src/rabbit_node_monitor.erl | 12 | ||||
| -rw-r--r-- | src/rabbit_priority_queue.erl | 34 | ||||
| -rw-r--r-- | src/rabbit_queue_consumers.erl | 6 | ||||
| -rw-r--r-- | src/rabbit_variable_queue.erl | 16 | ||||
| -rw-r--r-- | src/rabbit_version.erl | 6 |
8 files changed, 71 insertions, 72 deletions
diff --git a/src/gm.erl b/src/gm.erl index 0da190a57d..c53e9bb007 100644 --- a/src/gm.erl +++ b/src/gm.erl @@ -399,7 +399,6 @@ -define(FORCE_GC_TIMER, 250). -define(VERSION_START, 0). -define(SETS, ordsets). --define(DICT, orddict). -record(state, { self, @@ -824,8 +823,8 @@ handle_msg({catchup, Left, MembersStateLeft}, members_state = MembersState }) when MembersState =/= undefined -> MembersStateLeft1 = build_members_state(MembersStateLeft), - AllMembers = lists:usort(?DICT:fetch_keys(MembersState) ++ - ?DICT:fetch_keys(MembersStateLeft1)), + AllMembers = lists:usort(maps:keys(MembersState) ++ + maps:keys(MembersStateLeft1)), {MembersState1, Activity} = lists:foldl( fun (Id, MembersStateActivity) -> @@ -995,21 +994,21 @@ is_member_alias(Member, Self, View) -> dead_member_id({dead, Member}) -> Member. store_view_member(VMember = #view_member { id = Id }, {Ver, View}) -> - {Ver, ?DICT:store(Id, VMember, View)}. + {Ver, maps:put(Id, VMember, View)}. with_view_member(Fun, View, Id) -> store_view_member(Fun(fetch_view_member(Id, View)), View). -fetch_view_member(Id, {_Ver, View}) -> ?DICT:fetch(Id, View). +fetch_view_member(Id, {_Ver, View}) -> maps:get(Id, View). -find_view_member(Id, {_Ver, View}) -> ?DICT:find(Id, View). +find_view_member(Id, {_Ver, View}) -> maps:find(Id, View). -blank_view(Ver) -> {Ver, ?DICT:new()}. +blank_view(Ver) -> {Ver, maps:new()}. -alive_view_members({_Ver, View}) -> ?DICT:fetch_keys(View). +alive_view_members({_Ver, View}) -> maps:keys(View). all_known_members({_Ver, View}) -> - ?DICT:fold( + maps:fold( fun (Member, #view_member { aliases = Aliases }, Acc) -> ?SETS:to_list(Aliases) ++ [Member | Acc] end, [], View). @@ -1374,24 +1373,24 @@ with_member_acc(Fun, Id, {MembersState, Acc}) -> {store_member(Id, MemberState, MembersState), Acc1}. find_member_or_blank(Id, MembersState) -> - case ?DICT:find(Id, MembersState) of + case maps:find(Id, MembersState) of {ok, Result} -> Result; error -> blank_member() end. -erase_member(Id, MembersState) -> ?DICT:erase(Id, MembersState). +erase_member(Id, MembersState) -> maps:remove(Id, MembersState). blank_member() -> #member { pending_ack = queue:new(), last_pub = -1, last_ack = -1 }. -blank_member_state() -> ?DICT:new(). +blank_member_state() -> maps:new(). store_member(Id, MemberState, MembersState) -> - ?DICT:store(Id, MemberState, MembersState). + maps:put(Id, MemberState, MembersState). -prepare_members_state(MembersState) -> ?DICT:to_list(MembersState). +prepare_members_state(MembersState) -> maps:to_list(MembersState). -build_members_state(MembersStateList) -> ?DICT:from_list(MembersStateList). +build_members_state(MembersStateList) -> maps:from_list(MembersStateList). make_member(GroupName) -> {case dirty_read_group(GroupName) of diff --git a/src/rabbit_limiter.erl b/src/rabbit_limiter.erl index e81b10a555..93bacc568d 100644 --- a/src/rabbit_limiter.erl +++ b/src/rabbit_limiter.erl @@ -184,7 +184,7 @@ %% 'Notify' is a boolean that indicates whether a queue should be %% notified of a change in the limit or volume that may allow it to %% deliver more messages via the limiter's channel. - queues = orddict:new(), % QPid -> {MonitorRef, Notify} + queues = maps:new(), % QPid -> {MonitorRef, Notify} volume = 0}). %% mode is of type credit_mode() @@ -402,28 +402,28 @@ prefetch_limit_reached(#lim{prefetch_count = Limit, volume = Volume}) -> Limit =/= 0 andalso Volume >= Limit. remember_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:is_key(QPid, Queues) of + case maps:is_key(QPid, Queues) of false -> MRef = erlang:monitor(process, QPid), - State#lim{queues = orddict:store(QPid, {MRef, false}, Queues)}; + State#lim{queues = maps:put(QPid, {MRef, false}, Queues)}; true -> State end. forget_queue(QPid, State = #lim{queues = Queues}) -> - case orddict:find(QPid, Queues) of + case maps:find(QPid, Queues) of {ok, {MRef, _}} -> true = erlang:demonitor(MRef), - State#lim{queues = orddict:erase(QPid, Queues)}; + State#lim{queues = maps:remove(QPid, Queues)}; error -> State end. limit_queue(QPid, State = #lim{queues = Queues}) -> UpdateFun = fun ({MRef, _}) -> {MRef, true} end, - State#lim{queues = orddict:update(QPid, UpdateFun, Queues)}. + State#lim{queues = maps:update_with(QPid, UpdateFun, Queues)}. notify_queues(State = #lim{ch_pid = ChPid, queues = Queues}) -> {QList, NewQueues} = - orddict:fold(fun (_QPid, {_, false}, Acc) -> Acc; + maps:fold(fun (_QPid, {_, false}, Acc) -> Acc; (QPid, {MRef, true}, {L, D}) -> - {[QPid | L], orddict:store(QPid, {MRef, false}, D)} + {[QPid | L], maps:put(QPid, {MRef, false}, D)} end, {[], Queues}, Queues), case length(QList) of 0 -> ok; diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl index fe78075d0f..76976ad771 100644 --- a/src/rabbit_msg_store.erl +++ b/src/rabbit_msg_store.erl @@ -780,7 +780,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) -> sync_timer_ref = undefined, sum_valid_data = 0, sum_file_size = 0, - pending_gc_completion = orddict:new(), + pending_gc_completion = maps:new(), gc_pid = GCPid, file_handles_ets = FileHandlesEts, file_summary_ets = FileSummaryEts, @@ -1269,7 +1269,7 @@ contains_message(MsgId, From, gen_server2:reply(From, false), State; #msg_location { file = File } -> - case orddict:is_key(File, Pending) of + case maps:is_key(File, Pending) of true -> add_to_pending_gc_completion( {contains, MsgId, From}, File, State); false -> gen_server2:reply(From, true), @@ -1280,16 +1280,16 @@ contains_message(MsgId, From, add_to_pending_gc_completion( Op, File, State = #msstate { pending_gc_completion = Pending }) -> State #msstate { pending_gc_completion = - rabbit_misc:orddict_cons(File, Op, Pending) }. + rabbit_misc:maps_cons(File, Op, Pending) }. run_pending(Files, State) -> lists:foldl( fun (File, State1 = #msstate { pending_gc_completion = Pending }) -> - Pending1 = orddict:erase(File, Pending), + Pending1 = maps:remove(File, Pending), lists:foldl( fun run_pending_action/2, State1 #msstate { pending_gc_completion = Pending1 }, - lists:reverse(orddict:fetch(File, Pending))) + lists:reverse(maps:get(File, Pending))) end, State, Files). run_pending_action({read, MsgId, From}, State) -> @@ -1320,9 +1320,9 @@ adjust_valid_total_size(File, Delta, State = #msstate { [{#file_summary.valid_total_size, Delta}]), State #msstate { sum_valid_data = SumValid + Delta }. -orddict_store(Key, Val, Dict) -> - false = orddict:is_key(Key, Dict), - orddict:store(Key, Val, Dict). +maps_store(Key, Val, Dict) -> + false = maps:is_key(Key, Dict), + maps:put(Key, Val, Dict). update_pending_confirms(Fun, CRef, State = #msstate { clients = Clients, @@ -1860,7 +1860,7 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, %% complete traversal of FileSummaryEts. First = ets:first(FileSummaryEts), case First =:= '$end_of_table' orelse - orddict:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of + maps:size(Pending) >= ?MAXIMUM_SIMULTANEOUS_GC_FILES of true -> State; false -> @@ -1869,8 +1869,8 @@ maybe_compact(State = #msstate { sum_valid_data = SumValid, not_found -> State; {Src, Dst} -> - Pending1 = orddict_store(Dst, [], - orddict_store(Src, [], Pending)), + Pending1 = maps_store(Dst, [], + maps_store(Src, [], Pending)), State1 = close_handle(Src, close_handle(Dst, State)), true = ets:update_element(FileSummaryEts, Src, {#file_summary.locked, true}), @@ -1926,7 +1926,7 @@ delete_file_if_empty(File, State = #msstate { 0 -> true = ets:update_element(FileSummaryEts, File, {#file_summary.locked, true}), ok = rabbit_msg_store_gc:delete(GCPid, File), - Pending1 = orddict_store(File, [], Pending), + Pending1 = maps_store(File, [], Pending), close_handle(File, State #msstate { pending_gc_completion = Pending1 }); _ -> State diff --git a/src/rabbit_node_monitor.erl b/src/rabbit_node_monitor.erl index 8bf79f7130..565676af3c 100644 --- a/src/rabbit_node_monitor.erl +++ b/src/rabbit_node_monitor.erl @@ -350,7 +350,7 @@ init([]) -> subscribers = pmon:new(), partitions = [], guid = rabbit_guid:gen(), - node_guids = orddict:new(), + node_guids = maps:new(), autoheal = rabbit_autoheal:init()})}. handle_call(partitions, _From, State = #state{partitions = Partitions}) -> @@ -405,17 +405,17 @@ handle_cast({node_up, Node, NodeType, GUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> cast(Node, {announce_guid, node(), MyGUID}), - GUIDs1 = orddict:store(Node, GUID, GUIDs), + GUIDs1 = maps:put(Node, GUID, GUIDs), handle_cast({node_up, Node, NodeType}, State#state{node_guids = GUIDs1}); handle_cast({announce_guid, Node, GUID}, State = #state{node_guids = GUIDs}) -> - {noreply, State#state{node_guids = orddict:store(Node, GUID, GUIDs)}}; + {noreply, State#state{node_guids = maps:put(Node, GUID, GUIDs)}}; handle_cast({check_partial_partition, Node, Rep, NodeGUID, MyGUID, RepGUID}, State = #state{guid = MyGUID, node_guids = GUIDs}) -> case lists:member(Node, rabbit_mnesia:cluster_nodes(running)) andalso - orddict:find(Node, GUIDs) =:= {ok, NodeGUID} of + maps:find(Node, GUIDs) =:= {ok, NodeGUID} of true -> spawn_link( %%[1] fun () -> case rpc:call(Node, rabbit, is_running, []) of @@ -560,10 +560,10 @@ handle_info({nodedown, Node, Info}, State = #state{guid = MyGUID, cast(N, {check_partial_partition, Node, node(), DownGUID, CheckGUID, MyGUID}) end, - case orddict:find(Node, GUIDs) of + case maps:find(Node, GUIDs) of {ok, DownGUID} -> Alive = rabbit_mnesia:cluster_nodes(running) -- [node(), Node], - [case orddict:find(N, GUIDs) of + [case maps:find(N, GUIDs) of {ok, CheckGUID} -> Check(N, CheckGUID, DownGUID); error -> ok end || N <- Alive]; diff --git a/src/rabbit_priority_queue.erl b/src/rabbit_priority_queue.erl index 41e65e8a1f..9f9ef5adca 100644 --- a/src/rabbit_priority_queue.erl +++ b/src/rabbit_priority_queue.erl @@ -207,13 +207,13 @@ publish(Msg, MsgProps, IsDelivered, ChPid, Flow, ?passthrough1(publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS)). batch_publish(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_batch(Publishes, MaxP), + PubMap = partition_publish_batch(Publishes, MaxP), lists:foldl( fun ({Priority, Pubs}, St) -> pick1(fun (_P, BQSN) -> BQ:batch_publish(Pubs, ChPid, Flow, BQSN) end, Priority, St) - end, State, orddict:to_list(PubDict)); + end, State, maps:to_list(PubMap)); batch_publish(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> ?passthrough1(batch_publish(Publishes, ChPid, Flow, BQS)). @@ -229,7 +229,7 @@ publish_delivered(Msg, MsgProps, ChPid, Flow, ?passthrough2(publish_delivered(Msg, MsgProps, ChPid, Flow, BQS)). batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [{MaxP, _} |_]}) -> - PubDict = partition_publish_delivered_batch(Publishes, MaxP), + PubMap = partition_publish_delivered_batch(Publishes, MaxP), {PrioritiesAndAcks, State1} = lists:foldl( fun ({Priority, Pubs}, {PriosAndAcks, St}) -> @@ -241,7 +241,7 @@ batch_publish_delivered(Publishes, ChPid, Flow, State = #state{bq = BQ, bqss = [ {priority_on_acktags(P, AckTags), BQSN1} end, Priority, St), {[PriosAndAcks1 | PriosAndAcks], St1} - end, {[], State}, orddict:to_list(PubDict)), + end, {[], State}, maps:to_list(PubMap)), {lists:reverse(PrioritiesAndAcks), State1}; batch_publish_delivered(Publishes, ChPid, Flow, State = #passthrough{bq = BQ, bqs = BQS}) -> @@ -327,7 +327,7 @@ ackfold(MsgFun, Acc, State = #state{bq = BQ}, AckTags) -> AckTagsByPriority = partition_acktags(AckTags), fold2( fun (P, BQSN, AccN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, ATagsN} -> {AccN1, BQSN1} = BQ:ackfold(MsgFun, AccN, BQSN, ATagsN), {priority_on_acktags(P, AccN1), BQSN1}; @@ -439,7 +439,7 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, #state{bqss = [{MaxP, _} |_]}) -> MsgsByPriority = partition_publish_delivered_batch(Msgs, MaxP), lists:foldl(fun (Acks, MAs) -> {P, _AckTag} = hd(Acks), - Pubs = orddict:fetch(P, MsgsByPriority), + Pubs = maps:get(P, MsgsByPriority), MAs0 = zip_msgs_and_acks(Pubs, Acks), MAs ++ MAs0 end, Accumulator, AckTags); @@ -527,7 +527,7 @@ fold_min2(Fun, State) -> fold_by_acktags2(Fun, AckTags, State) -> AckTagsByPriority = partition_acktags(AckTags), fold_append2(fun (P, BQSN) -> - case orddict:find(P, AckTagsByPriority) of + case maps:find(P, AckTagsByPriority) of {ok, AckTagsN} -> Fun(AckTagsN, BQSN); error -> {[], BQSN} end @@ -597,11 +597,11 @@ partition_publishes(Publishes, ExtractMsg, MaxP) -> Partitioned = lists:foldl(fun (Pub, Dict) -> Msg = ExtractMsg(Pub), - rabbit_misc:orddict_cons(priority(Msg, MaxP), Pub, Dict) - end, orddict:new(), Publishes), - orddict:map(fun (_P, RevPubs) -> - lists:reverse(RevPubs) - end, Partitioned). + rabbit_misc:maps_cons(priority(Msg, MaxP), Pub, Dict) + end, maps:new(), Publishes), + maps:map(fun (_P, RevPubs) -> + lists:reverse(RevPubs) + end, Partitioned). priority_bq(Priority, [{MaxP, _} | _] = BQSs) -> @@ -625,14 +625,14 @@ add_maybe_infinity(infinity, _) -> infinity; add_maybe_infinity(_, infinity) -> infinity; add_maybe_infinity(A, B) -> A + B. -partition_acktags(AckTags) -> partition_acktags(AckTags, orddict:new()). +partition_acktags(AckTags) -> partition_acktags(AckTags, maps:new()). partition_acktags([], Partitioned) -> - orddict:map(fun (_P, RevAckTags) -> - lists:reverse(RevAckTags) - end, Partitioned); + maps:map(fun (_P, RevAckTags) -> + lists:reverse(RevAckTags) + end, Partitioned); partition_acktags([{P, AckTag} | Rest], Partitioned) -> - partition_acktags(Rest, rabbit_misc:orddict_cons(P, AckTag, Partitioned)). + partition_acktags(Rest, rabbit_misc:maps_cons(P, AckTag, Partitioned)). priority_on_acktags(P, AckTags) -> [case Tag of diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl index f13a46fcf3..0fe3065fe8 100644 --- a/src/rabbit_queue_consumers.erl +++ b/src/rabbit_queue_consumers.erl @@ -254,9 +254,9 @@ subtract_acks(ChPid, AckTags, State) -> not_found; C = #cr{acktags = ChAckTags, limiter = Lim} -> {CTagCounts, AckTags2} = subtract_acks( - AckTags, [], orddict:new(), ChAckTags), + AckTags, [], maps:new(), ChAckTags), {Unblocked, Lim2} = - orddict:fold( + maps:fold( fun (CTag, Count, {UnblockedN, LimN}) -> {Unblocked1, LimN1} = rabbit_limiter:ack_from_queue(LimN, CTag, Count), @@ -278,7 +278,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) -> case queue:out(AckQ) of {{value, {T, CTag}}, QTail} -> subtract_acks(TL, Prefix, - orddict:update_counter(CTag, 1, CTagCounts), QTail); + maps:update_with(CTag, fun (Old) -> Old + 1 end, 1, CTagCounts), QTail); {{value, V}, QTail} -> subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail); {empty, _} -> diff --git a/src/rabbit_variable_queue.erl b/src/rabbit_variable_queue.erl index 763e1efdad..49f4d8d3ed 100644 --- a/src/rabbit_variable_queue.erl +++ b/src/rabbit_variable_queue.erl @@ -1776,7 +1776,7 @@ remove_queue_entries(Q, DelsAndAcksFun, State = #vqstate{msg_store_clients = MSCState}) -> {MsgIdsByStore, Delivers, Acks, State1} = ?QUEUE:foldl(fun remove_queue_entries1/2, - {orddict:new(), [], [], State}, Q), + {maps:new(), [], [], State}, Q), remove_msgs_by_id(MsgIdsByStore, MSCState), DelsAndAcksFun(Delivers, Acks, State1). @@ -1786,7 +1786,7 @@ remove_queue_entries1( is_persistent = IsPersistent} = MsgStatus, {MsgIdsByStore, Delivers, Acks, State}) -> {case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, cons_if(IndexOnDisk andalso not IsDelivered, SeqId, Delivers), @@ -2143,27 +2143,27 @@ purge_pending_ack1(State = #vqstate { ram_pending_ack = RPA, qi_pending_ack = gb_trees:empty()}, {IndexOnDiskSeqIds, MsgIdsByStore, State1}. -%% MsgIdsByStore is an orddict with two keys: +%% MsgIdsByStore is an map with two keys: %% %% true: holds a list of Persistent Message Ids. %% false: holds a list of Transient Message Ids. %% -%% When we call orddict:to_list/1 we get two sets of msg ids, where +%% When we call maps:to_list/1 we get two sets of msg ids, where %% IsPersistent is either true for persistent messages or false for %% transient ones. The msg_store_remove/3 function takes this boolean %% flag to determine from which store the messages should be removed %% from. remove_msgs_by_id(MsgIdsByStore, MSCState) -> [ok = msg_store_remove(MSCState, IsPersistent, MsgIds) - || {IsPersistent, MsgIds} <- orddict:to_list(MsgIdsByStore)]. + || {IsPersistent, MsgIds} <- maps:to_list(MsgIdsByStore)]. remove_transient_msgs_by_id(MsgIdsByStore, MSCState) -> - case orddict:find(false, MsgIdsByStore) of + case maps:find(false, MsgIdsByStore) of error -> ok; {ok, MsgIds} -> ok = msg_store_remove(MSCState, false, MsgIds) end. -accumulate_ack_init() -> {[], orddict:new(), []}. +accumulate_ack_init() -> {[], maps:new(), []}. accumulate_ack(#msg_status { seq_id = SeqId, msg_id = MsgId, @@ -2173,7 +2173,7 @@ accumulate_ack(#msg_status { seq_id = SeqId, {IndexOnDiskSeqIdsAcc, MsgIdsByStore, AllMsgIds}) -> {cons_if(IndexOnDisk, SeqId, IndexOnDiskSeqIdsAcc), case MsgInStore of - true -> rabbit_misc:orddict_cons(IsPersistent, MsgId, MsgIdsByStore); + true -> rabbit_misc:maps_cons(IsPersistent, MsgId, MsgIdsByStore); false -> MsgIdsByStore end, [MsgId | AllMsgIds]}. diff --git a/src/rabbit_version.erl b/src/rabbit_version.erl index 3d69040b04..710e89922e 100644 --- a/src/rabbit_version.erl +++ b/src/rabbit_version.erl @@ -197,10 +197,10 @@ categorise_by_scope(Version) when is_list(Version) -> rabbit_misc:all_module_attributes(rabbit_upgrade), {Name, Scope, _Requires} <- Attributes, lists:member(Name, Version)], - orddict:to_list( + maps:to_list( lists:foldl(fun ({Scope, Name}, CatVersion) -> - rabbit_misc:orddict_cons(Scope, Name, CatVersion) - end, orddict:new(), Categorised)). + rabbit_misc:maps_cons(Scope, Name, CatVersion) + end, maps:new(), Categorised)). dir() -> rabbit_mnesia:dir(). |
