summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-04-11 17:24:05 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-04-11 17:24:05 +0100
commit0b70736943cc34c842f3fe13be96127e8251ad3f (patch)
tree2365db4a5de2314d8d6aa57f64b7894efe0be2df /src
parent8dc4a182ee7cfbb2e560cbc3d93653fbdf240449 (diff)
downloadrabbitmq-server-git-0b70736943cc34c842f3fe13be96127e8251ad3f.tar.gz
Move clients and cref_to_msg_ids from dicts to maps
Maps have better performance and readability then dicts.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_msg_store.erl35
1 files changed, 18 insertions, 17 deletions
diff --git a/src/rabbit_msg_store.erl b/src/rabbit_msg_store.erl
index 809ec4fda3..e3f1af52a9 100644
--- a/src/rabbit_msg_store.erl
+++ b/src/rabbit_msg_store.erl
@@ -703,7 +703,7 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients }) ->
- State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
+ State #msstate { cref_to_msg_ids = maps:remove(CRef, CTM),
dying_clients = maps:remove(CRef, DyingClients) }.
@@ -740,7 +740,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Name),
- Clients = dict:from_list(
+ Clients = maps:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
%% CleanShutdown => msg location index and file_summary both
@@ -790,7 +790,7 @@ init([Type, BaseDir, ClientRefs, StartupFunState]) ->
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
- cref_to_msg_ids = dict:new(),
+ cref_to_msg_ids = #{},
credit_disc_bound = CreditDiscBound
},
%% If we didn't recover the msg location index then we need to
@@ -843,7 +843,7 @@ handle_call({new_client_state, CRef, CPid, MsgOnDiskFun, CloseFDsFun}, _From,
flying_ets = FlyingEts,
clients = Clients,
gc_pid = GCPid }) ->
- Clients1 = dict:store(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
+ Clients1 = maps:put(CRef, {CPid, MsgOnDiskFun, CloseFDsFun}, Clients),
erlang:monitor(process, CPid),
reply({IndexState, IndexModule, Dir, GCPid, FileHandlesEts, FileSummaryEts,
CurFileCacheEts, FlyingEts},
@@ -874,7 +874,7 @@ handle_cast({client_dying, CRef},
handle_cast({client_delete, CRef},
State = #msstate { clients = Clients }) ->
- State1 = State #msstate { clients = dict:erase(CRef, Clients) },
+ State1 = State #msstate { clients = maps:remove(CRef, Clients) },
noreply(clear_client(CRef, State1));
handle_cast({write, CRef, MsgId, Flow},
@@ -882,7 +882,7 @@ handle_cast({write, CRef, MsgId, Flow},
clients = Clients,
credit_disc_bound = CreditDiscBound }) ->
case Flow of
- flow -> {CPid, _, _} = dict:fetch(CRef, Clients),
+ flow -> {CPid, _, _} = maps:get(CRef, Clients),
%% We are going to process a message sent by the
%% rabbit_amqqueue_process. Now we are accessing the
%% msg_store process dictionary.
@@ -1003,7 +1003,7 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
- case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
+ case store_recovery_terms([{client_refs, maps:keys(Clients)},
{index_module, IndexModule}], Dir) of
ok ->
rabbit_log:info("Message store for directory '~s' is stopped", [Dir]),
@@ -1035,12 +1035,12 @@ reply(Reply, State) ->
next_state(State = #msstate { sync_timer_ref = undefined,
cref_to_msg_ids = CTM }) ->
- case dict:size(CTM) of
+ case maps:size(CTM) of
0 -> {State, hibernate};
_ -> {start_sync_timer(State), 0}
end;
next_state(State = #msstate { cref_to_msg_ids = CTM }) ->
- case dict:size(CTM) of
+ case maps:size(CTM) of
0 -> {stop_sync_timer(State), hibernate};
_ -> {State, 0}
end.
@@ -1055,7 +1055,7 @@ stop_sync_timer(State) ->
internal_sync(State = #msstate { current_file_handle = CurHdl,
cref_to_msg_ids = CTM }) ->
State1 = stop_sync_timer(State),
- CGs = dict:fold(fun (CRef, MsgIds, NS) ->
+ CGs = maps:fold(fun (CRef, MsgIds, NS) ->
case gb_sets:is_empty(MsgIds) of
true -> NS;
false -> [{CRef, MsgIds} | NS]
@@ -1327,7 +1327,7 @@ orddict_store(Key, Val, Dict) ->
update_pending_confirms(Fun, CRef,
State = #msstate { clients = Clients,
cref_to_msg_ids = CTM }) ->
- case dict:fetch(CRef, Clients) of
+ case maps:get(CRef, Clients) of
{_CPid, undefined, _CloseFDsFun} -> State;
{_CPid, MsgOnDiskFun, _CloseFDsFun} -> CTM1 = Fun(MsgOnDiskFun, CTM),
State #msstate {
@@ -1337,21 +1337,22 @@ update_pending_confirms(Fun, CRef,
record_pending_confirm(CRef, MsgId, State) ->
update_pending_confirms(
fun (_MsgOnDiskFun, CTM) ->
- dict:update(CRef, fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
- gb_sets:singleton(MsgId), CTM)
+ maps:update_with(CRef,
+ fun (MsgIds) -> gb_sets:add(MsgId, MsgIds) end,
+ gb_sets:singleton(MsgId), CTM)
end, CRef, State).
client_confirm(CRef, MsgIds, ActionTaken, State) ->
update_pending_confirms(
fun (MsgOnDiskFun, CTM) ->
- case dict:find(CRef, CTM) of
+ case maps:find(CRef, CTM) of
{ok, Gs} -> MsgOnDiskFun(gb_sets:intersection(Gs, MsgIds),
ActionTaken),
MsgIds1 = rabbit_misc:gb_sets_difference(
Gs, MsgIds),
case gb_sets:is_empty(MsgIds1) of
- true -> dict:erase(CRef, CTM);
- false -> dict:store(CRef, MsgIds1, CTM)
+ true -> maps:remove(CRef, CTM);
+ false -> maps:put(CRef, MsgIds1, CTM)
end;
error -> CTM
end
@@ -1419,7 +1420,7 @@ mark_handle_to_close(ClientRefs, FileHandlesEts, File, Invoke) ->
[ begin
case (ets:update_element(FileHandlesEts, Key, {2, close})
andalso Invoke) of
- true -> case dict:fetch(Ref, ClientRefs) of
+ true -> case maps:get(Ref, ClientRefs) of
{_CPid, _MsgOnDiskFun, undefined} ->
ok;
{_CPid, _MsgOnDiskFun, CloseFDsFun} ->