summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-02 09:31:39 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-02 09:31:39 +0100
commitbf57eeecce34d7d4d371c0e47d42f3dfe26d9fa4 (patch)
tree2a5ba5133f13caaa93b9cc0abf776887df7b3025
parentc5b933c07e2390b372171b8e2ac574b866ea7611 (diff)
downloadrabbitmq-server-git-bf57eeecce34d7d4d371c0e47d42f3dfe26d9fa4.tar.gz
Fix crash bug when consumers are removed
And the queue is shut down. This is a regression introduced by the queue type refactoring.
-rw-r--r--src/rabbit_channel.erl32
-rw-r--r--src/rabbit_classic_queue.erl11
-rw-r--r--src/rabbit_queue_type.erl3
3 files changed, 24 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index f20e7463c3..0887591ad5 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -142,7 +142,7 @@
%% consumer details: #amqqueue record, acknowledgement mode,
%% consumer exclusivity, etc
consumer_mapping,
- %% a map of queue pids to consumer tag lists
+ %% a map of queue names to consumer tag lists
queue_consumers,
%% timer used to emit statistics
stats_timer,
@@ -743,7 +743,7 @@ handle_cast({queue_event, QRef, Evt},
State = handle_queue_actions(Actions, State1),
noreply_coalesce(State);
eol ->
- State1 = handle_consuming_queue_down_or_eol(QRef, QRef, State0),
+ State1 = handle_consuming_queue_down_or_eol(QRef, State0),
{ConfirmMXs, UC1} =
rabbit_confirms:remove_queue(QRef, State1#ch.unconfirmed),
%% Deleted queue is a special case.
@@ -789,7 +789,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason},
State = handle_queue_actions(Actions, State1),
noreply_coalesce(State);
{eol, QRef} ->
- State1 = handle_consuming_queue_down_or_eol(QRef, QRef, State0),
+ State1 = handle_consuming_queue_down_or_eol(QRef, State0),
{ConfirmMXs, UC1} =
rabbit_confirms:remove_queue(QRef, State1#ch.unconfirmed),
%% Deleted queue is a special case.
@@ -1457,16 +1457,17 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait},
%% Spec requires we ignore this situation.
return_ok(State, NoWait, OkMsg);
{ok, {Q, _CParams}} when ?is_amqqueue(Q) ->
- QPid = amqqueue:get_pid(Q),
+ QName = amqqueue:get_name(Q),
+
ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping),
- QRef = qpid_to_ref(QPid),
+ % QRef = qpid_to_ref(QPid),
QCons1 =
- case maps:find(QRef, QCons) of
+ case maps:find(QName, QCons) of
error -> QCons;
{ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags),
case gb_sets:is_empty(CTags1) of
- true -> maps:remove(QRef, QCons);
- false -> maps:put(QRef, CTags1, QCons)
+ true -> maps:remove(QName, QCons);
+ false -> maps:put(QName, CTags1, QCons)
end
end,
NewState = State#ch{consumer_mapping = ConsumerMapping1,
@@ -1747,6 +1748,7 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag,
ActualConsumerTag,
{Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}},
ConsumerMapping),
+
State1 = State#ch{consumer_mapping = CM1,
queue_states = QueueStates},
State2 = handle_queue_actions(Actions, State1),
@@ -1775,9 +1777,9 @@ consumer_monitor(ConsumerTag,
QCons1 = maps:put(QRef, CTags1, QCons),
State#ch{queue_consumers = QCons1}.
-handle_consuming_queue_down_or_eol(QRef, QName,
+handle_consuming_queue_down_or_eol(QName,
State = #ch{queue_consumers = QCons}) ->
- ConsumerTags = case maps:find(QRef, QCons) of
+ ConsumerTags = case maps:find(QName, QCons) of
error -> gb_sets:new();
{ok, CTags} -> CTags
end,
@@ -1795,7 +1797,7 @@ handle_consuming_queue_down_or_eol(QRef, QName,
cancel_consumer(CTag, QName, StateN)
end
end
- end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags).
+ end, State#ch{queue_consumers = maps:remove(QName, QCons)}, ConsumerTags).
%% [0] There is a slight danger here that if a queue is deleted and
%% then recreated again the reconsume will succeed even though it was
@@ -2638,12 +2640,6 @@ maybe_cancel_tick_timer(#ch{tick_timer = TRef,
State
end.
-qpid_to_ref(Pid) when is_pid(Pid) -> Pid;
-qpid_to_ref({Name, Node}) when is_atom(Name) andalso is_atom(Node) ->
- Name;
-%% assume it already is a ref
-qpid_to_ref(Ref) -> Ref.
-
now_millis() ->
erlang:monotonic_time(millisecond).
@@ -2712,7 +2708,7 @@ handle_queue_actions(Actions, #ch{} = State0) ->
({deliver, CTag, AckRequired, Msgs}, S0) ->
handle_deliver(CTag, AckRequired, Msgs, S0);
({queue_down, QRef}, S0) ->
- handle_consuming_queue_down_or_eol(QRef, QRef, S0)
+ handle_consuming_queue_down_or_eol(QRef, S0)
end, State0, Actions).
diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl
index 5eea2ba5d3..c383b3fc41 100644
--- a/src/rabbit_classic_queue.erl
+++ b/src/rabbit_classic_queue.erl
@@ -241,6 +241,12 @@ handle_event({reject_publish, SeqNo, _QPid},
handle_event({down, Pid, Info}, #?STATE{qref = QRef,
pid = MasterPid,
unconfirmed = U0} = State0) ->
+ Actions0 = case Pid =:= MasterPid of
+ true ->
+ [{queue_down, QRef}];
+ false ->
+ []
+ end,
case rabbit_misc:is_abnormal_exit(Info) of
false when Info =:= normal andalso Pid == MasterPid ->
%% queue was deleted and masterpid is down
@@ -254,7 +260,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
{Unconfirmed, ConfirmedSeqNos, Rejected} =
settle_seq_nos(MsgSeqNos, Pid, U0, down),
Actions = [{settled, QRef, ConfirmedSeqNos},
- {rejected, QRef, Rejected}],
+ {rejected, QRef, Rejected} | Actions0],
{ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions};
true ->
%% any abnormal exit should be considered a full reject of the
@@ -270,7 +276,8 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef,
end
end, [], U0),
U = maps:without(MsgIds, U0),
- {ok, State0#?STATE{unconfirmed = U}, [{rejected, QRef, MsgIds}]}
+ {ok, State0#?STATE{unconfirmed = U},
+ [{rejected, QRef, MsgIds} | Actions0]}
end.
-spec deliver([{amqqueue:amqqueue(), state()}],
diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl
index f8dae3910e..727ee43faf 100644
--- a/src/rabbit_queue_type.erl
+++ b/src/rabbit_queue_type.erl
@@ -73,7 +73,6 @@
%% The queue type module will then emit a {confirm | reject, [msg_tag()}
%% action to the channel or channel like process when a msg_tag
%% has reached its conclusion
- % unsettled = #{} :: #{msg_tag() => [queue_ref()]},
state :: queue_state()}).
@@ -375,7 +374,7 @@ handle_down(Pid, Info, #?STATE{monitor_registry = Reg} = State0) ->
%% TODO: remove Pid from monitor_registry
case handle_event(QRef, {down, Pid, Info}, State0) of
{ok, State, Actions} ->
- {ok, State, [{queue_down, QRef} | Actions]};
+ {ok, State, Actions};
eol ->
{eol, QRef};
Err ->