summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorDaniil Fedotov <dfedotov@pivotal.io>2017-04-21 17:04:48 +0100
committerDaniil Fedotov <dfedotov@pivotal.io>2017-04-21 17:04:48 +0100
commite07ca0eacc0f2db77685a48253b3457a22c0e269 (patch)
tree325b6a9876526715a7152ece6d4f6b9443ffce5a /src
parentb5a07fa724a9d31ebd8d42891518addcf1dd09d6 (diff)
downloadrabbitmq-server-git-e07ca0eacc0f2db77685a48253b3457a22c0e269.tar.gz
Replace dicts with maps for queue mirroring logic.
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_amqqueue_process.erl2
-rw-r--r--src/rabbit_mirror_queue_master.erl28
-rw-r--r--src/rabbit_mirror_queue_slave.erl56
-rw-r--r--src/rabbit_queue_consumers.erl5
4 files changed, 46 insertions, 45 deletions
diff --git a/src/rabbit_amqqueue_process.erl b/src/rabbit_amqqueue_process.erl
index e9f91041e7..c52d329392 100644
--- a/src/rabbit_amqqueue_process.erl
+++ b/src/rabbit_amqqueue_process.erl
@@ -99,7 +99,7 @@
-spec info_keys() -> rabbit_types:info_keys().
-spec init_with_backing_queue_state
(rabbit_types:amqqueue(), atom(), tuple(), any(),
- [rabbit_types:delivery()], pmon:pmon(), dict:dict()) ->
+ [rabbit_types:delivery()], pmon:pmon(), gb_trees:tree()) ->
#q{}.
%%----------------------------------------------------------------------------
diff --git a/src/rabbit_mirror_queue_master.erl b/src/rabbit_mirror_queue_master.erl
index b006e37eb2..b9952178e0 100644
--- a/src/rabbit_mirror_queue_master.erl
+++ b/src/rabbit_mirror_queue_master.erl
@@ -57,13 +57,13 @@
coordinator :: pid(),
backing_queue :: atom(),
backing_queue_state :: any(),
- seen_status :: dict:dict(),
+ seen_status :: map(),
confirmed :: [rabbit_guid:guid()],
known_senders :: sets:set()
}.
-spec promote_backing_queue_state
(rabbit_amqqueue:name(), pid(), atom(), any(), pid(), [any()],
- dict:dict(), [pid()]) ->
+ map(), [pid()]) ->
master_state().
-spec sender_death_fun() -> death_fun().
@@ -127,7 +127,7 @@ init_with_existing_bq(Q = #amqqueue{name = QName}, BQ, BQS) ->
coordinator = CPid,
backing_queue = BQ,
backing_queue_state = BQS,
- seen_status = dict:new(),
+ seen_status = #{},
confirmed = [],
known_senders = sets:new(),
wait_timeout = rabbit_misc:get_env(rabbit, slave_wait_timeout, 15000) };
@@ -266,7 +266,7 @@ publish(Msg = #basic_message { id = MsgId }, MsgProps, IsDelivered, ChPid, Flow,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
BQS1 = BQ:publish(Msg, MsgProps, IsDelivered, ChPid, Flow, BQS),
@@ -281,7 +281,7 @@ batch_publish(Publishes, ChPid, Flow,
lists:foldl(fun ({Msg = #basic_message { id = MsgId },
MsgProps, _IsDelivered}, {Pubs, false, Sizes}) ->
{[{Msg, MsgProps, true} | Pubs], %% [0]
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {[], false, 0}, Publishes),
Publishes2 = lists:reverse(Publishes1),
@@ -298,7 +298,7 @@ publish_delivered(Msg = #basic_message { id = MsgId }, MsgProps,
seen_status = SS,
backing_queue = BQ,
backing_queue_state = BQS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {publish_delivered, ChPid, Flow, MsgProps, Msg},
rabbit_basic:msg_size(Msg)),
{AckTag, BQS1} = BQ:publish_delivered(Msg, MsgProps, ChPid, Flow, BQS),
@@ -313,7 +313,7 @@ batch_publish_delivered(Publishes, ChPid, Flow,
{false, MsgSizes} =
lists:foldl(fun ({Msg = #basic_message { id = MsgId }, _MsgProps},
{false, Sizes}) ->
- {false = dict:is_key(MsgId, SS), %% ASSERTION
+ {false = maps:is_key(MsgId, SS), %% ASSERTION
Sizes + rabbit_basic:msg_size(Msg)}
end, {false, 0}, Publishes),
ok = gm:broadcast(GM, {batch_publish_delivered, ChPid, Flow, Publishes},
@@ -326,7 +326,7 @@ discard(MsgId, ChPid, Flow, State = #state { gm = GM,
backing_queue = BQ,
backing_queue_state = BQS,
seen_status = SS }) ->
- false = dict:is_key(MsgId, SS), %% ASSERTION
+ false = maps:is_key(MsgId, SS), %% ASSERTION
ok = gm:broadcast(GM, {discard, ChPid, Flow, MsgId}),
ensure_monitoring(ChPid,
State #state { backing_queue_state =
@@ -353,7 +353,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
lists:foldl(
fun (MsgId, {MsgIdsN, SSN}) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, SSN) of
+ case maps:find(MsgId, SSN) of
error ->
{[MsgId | MsgIdsN], SSN};
{ok, published} ->
@@ -364,7 +364,7 @@ drain_confirmed(State = #state { backing_queue = BQ,
%% consequently we need to filter out the
%% confirm here. We will issue the confirm
%% when we see the publish from the channel.
- {MsgIdsN, dict:store(MsgId, confirmed, SSN)};
+ {MsgIdsN, maps:put(MsgId, confirmed, SSN)};
{ok, confirmed} ->
%% Well, confirms are racy by definition.
{[MsgId | MsgIdsN], SSN}
@@ -457,7 +457,7 @@ msg_rates(#state { backing_queue = BQ, backing_queue_state = BQS }) ->
info(backing_queue_status,
State = #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(backing_queue_status, BQS) ++
- [ {mirror_seen, dict:size(State #state.seen_status)},
+ [ {mirror_seen, maps:size(State #state.seen_status)},
{mirror_senders, sets:size(State #state.known_senders)} ];
info(Item, #state { backing_queue = BQ, backing_queue_state = BQS }) ->
BQ:info(Item, BQS).
@@ -480,7 +480,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% it.
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, SS) of
+ case maps:find(MsgId, SS) of
error ->
%% We permit the underlying BQ to have a peek at it, but
%% only if we ourselves are not filtering out the msg.
@@ -494,7 +494,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% immediately after calling is_duplicate). The msg is
%% invalid. We will not see this again, nor will we be
%% further involved in confirming this message, so erase.
- {true, State #state { seen_status = dict:erase(MsgId, SS) }};
+ {true, State #state { seen_status = maps:remove(MsgId, SS) }};
{ok, Disposition}
when Disposition =:= confirmed
%% It got published when we were a slave via gm, and
@@ -509,7 +509,7 @@ is_duplicate(Message = #basic_message { id = MsgId },
%% Message was discarded while we were a slave. Confirm now.
%% As above, amqqueue_process will have the entry for the
%% msg_id_to_channel mapping.
- {true, State #state { seen_status = dict:erase(MsgId, SS),
+ {true, State #state { seen_status = maps:remove(MsgId, SS),
confirmed = [MsgId | Confirmed] }}
end.
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 61623c9441..748a5afdf5 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -129,10 +129,10 @@ handle_go(Q = #amqqueue{name = QName}) ->
rate_timer_ref = undefined,
sync_timer_ref = undefined,
- sender_queues = dict:new(),
- msg_id_ack = dict:new(),
+ sender_queues = #{},
+ msg_id_ack = #{},
- msg_id_status = dict:new(),
+ msg_id_status = #{},
known_senders = pmon:new(delegate),
depth_delta = undefined
@@ -310,7 +310,7 @@ handle_cast({sync_start, Ref, Syncer},
State1 = #state{rate_timer_ref = TRef} = ensure_rate_timer(State),
S = fun({MA, TRefN, BQSN}) ->
State1#state{depth_delta = undefined,
- msg_id_ack = dict:from_list(MA),
+ msg_id_ack = maps:from_list(MA),
rate_timer_ref = TRefN,
backing_queue_state = BQSN}
end,
@@ -546,7 +546,7 @@ send_or_record_confirm(published, #delivery { sender = ChPid,
id = MsgId,
is_persistent = true } },
MS, #state { q = #amqqueue { durable = true } }) ->
- dict:store(MsgId, {published, ChPid, MsgSeqNo} , MS);
+ maps:put(MsgId, {published, ChPid, MsgSeqNo} , MS);
send_or_record_confirm(_Status, #delivery { sender = ChPid,
confirm = true,
msg_seq_no = MsgSeqNo },
@@ -559,7 +559,7 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
lists:foldl(
fun (MsgId, {CMsN, MSN} = Acc) ->
%% We will never see 'discarded' here
- case dict:find(MsgId, MSN) of
+ case maps:find(MsgId, MSN) of
error ->
%% If it needed confirming, it'll have
%% already been done.
@@ -567,12 +567,12 @@ confirm_messages(MsgIds, State = #state { msg_id_status = MS }) ->
{ok, published} ->
%% Still not seen it from the channel, just
%% record that it's been confirmed.
- {CMsN, dict:store(MsgId, confirmed, MSN)};
+ {CMsN, maps:put(MsgId, confirmed, MSN)};
{ok, {published, ChPid, MsgSeqNo}} ->
%% Seen from both GM and Channel. Can now
%% confirm.
{rabbit_misc:gb_trees_cons(ChPid, MsgSeqNo, CMsN),
- dict:erase(MsgId, MSN)};
+ maps:remove(MsgId, MSN)};
{ok, confirmed} ->
%% It's already been confirmed. This is
%% probably it's been both sync'd to disk
@@ -672,21 +672,21 @@ promote_me(From, #state { q = Q = #amqqueue { name = QName },
%% Master, or MTC in queue_process.
St = [published, confirmed, discarded],
- SS = dict:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
- AckTags = [AckTag || {_MsgId, AckTag} <- dict:to_list(MA)],
+ SS = maps:filter(fun (_MsgId, Status) -> lists:member(Status, St) end, MS),
+ AckTags = [AckTag || {_MsgId, AckTag} <- maps:to_list(MA)],
MasterState = rabbit_mirror_queue_master:promote_backing_queue_state(
QName, CPid, BQ, BQS, GM, AckTags, SS, MPids),
- MTC = dict:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
+ MTC = maps:fold(fun (MsgId, {published, ChPid, MsgSeqNo}, MTC0) ->
gb_trees:insert(MsgId, {ChPid, MsgSeqNo}, MTC0);
(_Msgid, _Status, MTC0) ->
MTC0
end, gb_trees:empty(), MS),
Deliveries = [promote_delivery(Delivery) ||
- {_ChPid, {PubQ, _PendCh, _ChState}} <- dict:to_list(SQ),
+ {_ChPid, {PubQ, _PendCh, _ChState}} <- maps:to_list(SQ),
Delivery <- queue:to_list(PubQ)],
- AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- dict:to_list(SQ)],
+ AwaitGmDown = [ChPid || {ChPid, {_, _, down_from_ch}} <- maps:to_list(SQ)],
KS1 = lists:foldl(fun (ChPid0, KS0) ->
pmon:demonitor(ChPid0, KS0)
end, KS, AwaitGmDown),
@@ -798,20 +798,20 @@ forget_sender(Down1, Down2) when Down1 =/= Down2 -> true.
maybe_forget_sender(ChPid, ChState, State = #state { sender_queues = SQ,
msg_id_status = MS,
known_senders = KS }) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
State;
{ok, {MQ, PendCh, ChStateRecord}} ->
case forget_sender(ChState, ChStateRecord) of
true ->
credit_flow:peer_down(ChPid),
- State #state { sender_queues = dict:erase(ChPid, SQ),
+ State #state { sender_queues = maps:remove(ChPid, SQ),
msg_id_status = lists:foldl(
- fun dict:erase/2,
+ fun maps:remove/2,
MS, sets:to_list(PendCh)),
known_senders = pmon:demonitor(ChPid, KS) };
false ->
- SQ1 = dict:store(ChPid, {MQ, PendCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ, PendCh, ChState}, SQ),
State #state { sender_queues = SQ1 }
end
end.
@@ -823,32 +823,32 @@ maybe_enqueue_message(
send_mandatory(Delivery), %% must do this before confirms
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
- case dict:find(MsgId, MS) of
+ case maps:find(MsgId, MS) of
error ->
{MQ, PendingCh, ChState} = get_sender_queue(ChPid, SQ),
MQ1 = queue:in(Delivery, MQ),
- SQ1 = dict:store(ChPid, {MQ1, PendingCh, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh, ChState}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, Status} ->
MS1 = send_or_record_confirm(
- Status, Delivery, dict:erase(MsgId, MS), State1),
+ Status, Delivery, maps:remove(MsgId, MS), State1),
SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
State1 #state { msg_id_status = MS1,
sender_queues = SQ1 }
end.
get_sender_queue(ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error -> {queue:new(), sets:new(), running};
{ok, Val} -> Val
end.
remove_from_pending_ch(MsgId, ChPid, SQ) ->
- case dict:find(ChPid, SQ) of
+ case maps:find(ChPid, SQ) of
error ->
SQ;
{ok, {MQ, PendingCh, ChState}} ->
- dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
+ maps:put(ChPid, {MQ, sets:del_element(MsgId, PendingCh), ChState},
SQ)
end.
@@ -865,7 +865,7 @@ publish_or_discard(Status, ChPid, MsgId,
case queue:out(MQ) of
{empty, _MQ2} ->
{MQ, sets:add_element(MsgId, PendingCh),
- dict:store(MsgId, Status, MS)};
+ maps:put(MsgId, Status, MS)};
{{value, Delivery = #delivery {
message = #basic_message { id = MsgId } }}, MQ2} ->
{MQ2, PendingCh,
@@ -880,7 +880,7 @@ publish_or_discard(Status, ChPid, MsgId,
%% expecting any confirms from us.
{MQ, PendingCh, MS}
end,
- SQ1 = dict:store(ChPid, {MQ1, PendingCh1, ChState}, SQ),
+ SQ1 = maps:put(ChPid, {MQ1, PendingCh1, ChState}, SQ),
State1 #state { sender_queues = SQ1, msg_id_status = MS1 }.
@@ -1002,9 +1002,9 @@ msg_ids_to_acktags(MsgIds, MA) ->
{AckTags, MA1} =
lists:foldl(
fun (MsgId, {Acc, MAN}) ->
- case dict:find(MsgId, MA) of
+ case maps:find(MsgId, MA) of
error -> {Acc, MAN};
- {ok, AckTag} -> {[AckTag | Acc], dict:erase(MsgId, MAN)}
+ {ok, AckTag} -> {[AckTag | Acc], maps:remove(MsgId, MAN)}
end
end, {[], MA}, MsgIds),
{lists:reverse(AckTags), MA1}.
@@ -1012,7 +1012,7 @@ msg_ids_to_acktags(MsgIds, MA) ->
maybe_store_ack(false, _MsgId, _AckTag, State) ->
State;
maybe_store_ack(true, MsgId, AckTag, State = #state { msg_id_ack = MA }) ->
- State #state { msg_id_ack = dict:store(MsgId, AckTag, MA) }.
+ State #state { msg_id_ack = maps:put(MsgId, AckTag, MA) }.
set_delta(0, State = #state { depth_delta = undefined }) ->
ok = record_synchronised(State#state.q),
diff --git a/src/rabbit_queue_consumers.erl b/src/rabbit_queue_consumers.erl
index 4e9715fba2..f13a46fcf3 100644
--- a/src/rabbit_queue_consumers.erl
+++ b/src/rabbit_queue_consumers.erl
@@ -64,7 +64,8 @@
-spec max_active_priority(state()) -> integer() | 'infinity' | 'empty'.
-spec inactive(state()) -> boolean().
-spec all(state()) -> [{ch(), rabbit_types:ctag(), boolean(),
- non_neg_integer(), rabbit_framing:amqp_table()}].
+ non_neg_integer(), rabbit_framing:amqp_table(),
+ rabbit_types:username()}].
-spec count() -> non_neg_integer().
-spec unacknowledged_message_count() -> non_neg_integer().
-spec add(ch(), rabbit_types:ctag(), boolean(), pid(), boolean(),
@@ -280,7 +281,7 @@ subtract_acks([T | TL] = AckTags, Prefix, CTagCounts, AckQ) ->
orddict:update_counter(CTag, 1, CTagCounts), QTail);
{{value, V}, QTail} ->
subtract_acks(AckTags, [V | Prefix], CTagCounts, QTail);
- {empty, _} ->
+ {empty, _} ->
subtract_acks([], Prefix, CTagCounts, AckQ)
end.