summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMatthew Sackman <matthew@rabbitmq.com>2011-05-23 16:45:27 +0100
committerMatthew Sackman <matthew@rabbitmq.com>2011-05-23 16:45:27 +0100
commitef7511bd205f45762757e9bb6c1fb00a6b824cdf (patch)
tree0f3984300f4c1b69c25c3de8c2f9a88e8eb08eda /src
parent1cce55e21613fbf6b239f0fd23778da47ad5dd9f (diff)
downloadrabbitmq-server-git-ef7511bd205f45762757e9bb6c1fb00a6b824cdf.tar.gz
Fix the remaining memory leak
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_mirror_queue_slave.erl172
1 files changed, 93 insertions, 79 deletions
diff --git a/src/rabbit_mirror_queue_slave.erl b/src/rabbit_mirror_queue_slave.erl
index 558e372eb4..b6aaecb79f 100644
--- a/src/rabbit_mirror_queue_slave.erl
+++ b/src/rabbit_mirror_queue_slave.erl
@@ -59,7 +59,7 @@
sync_timer_ref,
rate_timer_ref,
- sender_queues, %% :: Pid -> MsgQ
+ sender_queues, %% :: Pid -> {Q {Msg, Bool}, Set MsgId}
msg_id_ack, %% :: MsgId -> AckTag
ack_num,
@@ -500,7 +500,7 @@ promote_me(From, #state { q = Q,
{MsgId, {published, ChPid, MsgSeqNo}} <- dict:to_list(MS)]),
NumAckTags = [NumAckTag || {_MsgId, NumAckTag} <- dict:to_list(MA)],
AckTags = [AckTag || {_Num, AckTag} <- lists:sort(NumAckTags)],
- Deliveries = [Delivery || {_ChPid, PubQ} <- dict:to_list(SQ),
+ Deliveries = [Delivery || {_ChPid, {PubQ, _PendCh}} <- dict:to_list(SQ),
{Delivery, true} <- queue:to_list(PubQ)],
QueueState = rabbit_amqqueue_process:init_with_backing_queue_state(
Q1, rabbit_mirror_queue_master, MasterState, RateTRef,
@@ -610,47 +610,65 @@ maybe_enqueue_message(
sender = ChPid,
txn = none },
EnqueueOnPromotion,
- State = #state { sender_queues = SQ,
- msg_id_status = MS }) ->
+ State = #state { sender_queues = SQ, msg_id_status = MS }) ->
State1 = ensure_monitoring(ChPid, State),
%% We will never see {published, ChPid, MsgSeqNo} here.
case dict:find(MsgId, MS) of
error ->
- MQ = case dict:find(ChPid, SQ) of
- {ok, MQ1} -> MQ1;
- error -> queue:new()
- end,
- SQ1 = dict:store(ChPid,
- queue:in({Delivery, EnqueueOnPromotion}, MQ), SQ),
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ MQ1 = queue:in({Delivery, EnqueueOnPromotion}, MQ),
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh}, SQ),
State1 #state { sender_queues = SQ1 };
{ok, {confirmed, ChPid}} ->
%% BQ has confirmed it but we didn't know what the
%% msg_seq_no was at the time. We do now!
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- State1 #state { msg_id_status = dict:erase(MsgId, MS) };
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { sender_queues = SQ1,
+ msg_id_status = dict:erase(MsgId, MS) };
{ok, {published, ChPid}} ->
%% It was published to the BQ and we didn't know the
%% msg_seq_no so couldn't confirm it at the time.
case needs_confirming(Delivery, State1) of
never ->
- State1 #state { msg_id_status = dict:erase(MsgId, MS) };
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 };
eventually ->
State1 #state {
msg_id_status =
dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS) };
immediately ->
ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- State1 #state { msg_id_status = dict:erase(MsgId, MS) }
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
end;
{ok, discarded} ->
%% We've already heard from GM that the msg is to be
%% discarded. We won't see this again.
- State1 #state { msg_id_status = dict:erase(MsgId, MS) }
+ SQ1 = remove_from_pending_ch(MsgId, ChPid, SQ),
+ State1 #state { msg_id_status = dict:erase(MsgId, MS),
+ sender_queues = SQ1 }
end;
maybe_enqueue_message(_Delivery, _EnqueueOnPromotion, State) ->
%% We don't support txns in mirror queues.
State.
+get_sender_queue(ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error -> {queue:new(), sets:new()};
+ {ok, Val} -> Val
+ end.
+
+remove_from_pending_ch(MsgId, ChPid, SQ) ->
+ case dict:find(ChPid, SQ) of
+ error ->
+ SQ;
+ {ok, {MQ, PendingCh}} ->
+ dict:store(ChPid, {MQ, sets:del_element(MsgId, PendingCh)}, SQ)
+ end.
+
process_instruction(
{publish, Deliver, ChPid, MsgProps, Msg = #basic_message { id = MsgId }},
State = #state { sender_queues = SQ,
@@ -667,46 +685,39 @@ process_instruction(
%% that we've seen the msg_id confirmed until we can associate it
%% with a msg_seq_no.
State1 = ensure_monitoring(ChPid, State),
- MS1 = dict:store(MsgId, {published, ChPid}, MS),
- {SQ1, MS2} =
- case dict:find(ChPid, SQ) of
- error ->
- {SQ, MS1};
- {ok, MQ} ->
- case queue:out(MQ) of
- {empty, _MQ} ->
- {SQ, MS1};
- {{value, {Delivery = #delivery {
- msg_seq_no = MsgSeqNo,
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ1} ->
- %% We received the msg from the channel
- %% first. Thus we need to deal with confirms
- %% here.
- {dict:store(ChPid, MQ1, SQ),
- case needs_confirming(Delivery, State1) of
- never ->
- MS;
- eventually ->
- dict:store(
- MsgId, {published, ChPid, MsgSeqNo}, MS);
- immediately ->
- ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
- MS
- end};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} ->
- %% The instruction was sent to us before we
- %% were within the mirror_pids within the
- %% #amqqueue{} record. We'll never receive the
- %% message directly from the channel. And the
- %% channel will not be expecting any confirms
- %% from us.
- {SQ, MS}
- end
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ2} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid}, MS)};
+ {{value, {Delivery = #delivery {
+ msg_seq_no = MsgSeqNo,
+ message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We received the msg from the channel first. Thus we
+ %% need to deal with confirms here.
+ case needs_confirming(Delivery, State1) of
+ never ->
+ {MQ2, PendingCh, MS};
+ eventually ->
+ {MQ2, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, {published, ChPid, MsgSeqNo}, MS)};
+ immediately ->
+ ok = rabbit_channel:confirm(ChPid, [MsgSeqNo]),
+ {MQ2, PendingCh, MS}
+ end;
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the mirror_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel. And the channel will not be
+ %% expecting any confirms from us.
+ {MQ, PendingCh, MS}
end,
- State2 = State1 #state { sender_queues = SQ1,
- msg_id_status = MS2 },
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
+ State2 = State1 #state { sender_queues = SQ1, msg_id_status = MS1 },
{ok,
case Deliver of
@@ -727,33 +738,28 @@ process_instruction({discard, ChPid, Msg = #basic_message { id = MsgId }},
%% Many of the comments around the publish head above apply here
%% too.
State1 = ensure_monitoring(ChPid, State),
- MS1 = dict:store(MsgId, discarded, MS),
- {SQ1, MS2} =
- case dict:find(ChPid, SQ) of
- error ->
- {SQ, MS1};
- {ok, MQ} ->
- case queue:out(MQ) of
- {empty, _MQ} ->
- {SQ, MS1};
- {{value, {#delivery {
- message = #basic_message { id = MsgId } },
- _EnqueueOnPromotion}}, MQ1} ->
- %% We've already seen it from the channel,
- %% we're not going to see this again, so don't
- %% add it to MS
- {dict:store(ChPid, MQ1, SQ), MS};
- {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ1} ->
- %% The instruction was sent to us before we
- %% were within the mirror_pids within the
- %% #amqqueue{} record. We'll never receive the
- %% message directly from the channel.
- {SQ, MS}
- end
+ {MQ, PendingCh} = get_sender_queue(ChPid, SQ),
+ {MQ1, PendingCh1, MS1} =
+ case queue:out(MQ) of
+ {empty, _MQ} ->
+ {MQ, sets:add_element(MsgId, PendingCh),
+ dict:store(MsgId, discarded, MS)};
+ {{value, {#delivery { message = #basic_message { id = MsgId } },
+ _EnqueueOnPromotion}}, MQ2} ->
+ %% We've already seen it from the channel, we're not
+ %% going to see this again, so don't add it to MS
+ {MQ2, PendingCh, MS};
+ {{value, {#delivery {}, _EnqueueOnPromotion}}, _MQ2} ->
+ %% The instruction was sent to us before we were
+ %% within the mirror_pids within the #amqqueue{}
+ %% record. We'll never receive the message directly
+ %% from the channel.
+ {MQ, PendingCh, MS}
end,
+ SQ1 = dict:store(ChPid, {MQ1, PendingCh1}, SQ),
BQS1 = BQ:discard(Msg, ChPid, BQS),
{ok, State1 #state { sender_queues = SQ1,
- msg_id_status = MS2,
+ msg_id_status = MS1,
backing_queue_state = BQS1 }};
process_instruction({set_length, Length},
State = #state { backing_queue = BQ,
@@ -815,15 +821,23 @@ process_instruction({requeue, MsgPropsFun, MsgIds},
end};
process_instruction({sender_death, ChPid},
State = #state { sender_queues = SQ,
+ msg_id_status = MS,
known_senders = KS }) ->
{ok, case dict:find(ChPid, KS) of
error ->
State;
{ok, MRef} ->
true = erlang:demonitor(MRef),
- KS1 = dict:erase(ChPid, KS),
- SQ1 = dict:erase(ChPid, SQ),
- State #state { sender_queues = SQ1, known_senders = KS1 }
+ MS1 = case dict:find(ChPid, SQ) of
+ error ->
+ MS;
+ {ok, {_MQ, PendingCh}} ->
+ lists:foldl(fun dict:erase/2, MS,
+ sets:to_list(PendingCh))
+ end,
+ State #state { sender_queues = dict:erase(ChPid, SQ),
+ msg_id_status = MS1,
+ known_senders = dict:erase(ChPid, KS) }
end};
process_instruction(delete_and_terminate,
State = #state { backing_queue = BQ,