diff options
| -rw-r--r-- | src/rabbit_channel.erl | 32 | ||||
| -rw-r--r-- | src/rabbit_classic_queue.erl | 11 | ||||
| -rw-r--r-- | src/rabbit_queue_type.erl | 3 |
3 files changed, 24 insertions, 22 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index f20e7463c3..0887591ad5 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -142,7 +142,7 @@ %% consumer details: #amqqueue record, acknowledgement mode, %% consumer exclusivity, etc consumer_mapping, - %% a map of queue pids to consumer tag lists + %% a map of queue names to consumer tag lists queue_consumers, %% timer used to emit statistics stats_timer, @@ -743,7 +743,7 @@ handle_cast({queue_event, QRef, Evt}, State = handle_queue_actions(Actions, State1), noreply_coalesce(State); eol -> - State1 = handle_consuming_queue_down_or_eol(QRef, QRef, State0), + State1 = handle_consuming_queue_down_or_eol(QRef, State0), {ConfirmMXs, UC1} = rabbit_confirms:remove_queue(QRef, State1#ch.unconfirmed), %% Deleted queue is a special case. @@ -789,7 +789,7 @@ handle_info({'DOWN', _MRef, process, QPid, Reason}, State = handle_queue_actions(Actions, State1), noreply_coalesce(State); {eol, QRef} -> - State1 = handle_consuming_queue_down_or_eol(QRef, QRef, State0), + State1 = handle_consuming_queue_down_or_eol(QRef, State0), {ConfirmMXs, UC1} = rabbit_confirms:remove_queue(QRef, State1#ch.unconfirmed), %% Deleted queue is a special case. @@ -1457,16 +1457,17 @@ handle_method(#'basic.cancel'{consumer_tag = ConsumerTag, nowait = NoWait}, %% Spec requires we ignore this situation. return_ok(State, NoWait, OkMsg); {ok, {Q, _CParams}} when ?is_amqqueue(Q) -> - QPid = amqqueue:get_pid(Q), + QName = amqqueue:get_name(Q), + ConsumerMapping1 = maps:remove(ConsumerTag, ConsumerMapping), - QRef = qpid_to_ref(QPid), + % QRef = qpid_to_ref(QPid), QCons1 = - case maps:find(QRef, QCons) of + case maps:find(QName, QCons) of error -> QCons; {ok, CTags} -> CTags1 = gb_sets:delete(ConsumerTag, CTags), case gb_sets:is_empty(CTags1) of - true -> maps:remove(QRef, QCons); - false -> maps:put(QRef, CTags1, QCons) + true -> maps:remove(QName, QCons); + false -> maps:put(QName, CTags1, QCons) end end, NewState = State#ch{consumer_mapping = ConsumerMapping1, @@ -1747,6 +1748,7 @@ basic_consume(QueueName, NoAck, ConsumerPrefetch, ActualConsumerTag, ActualConsumerTag, {Q, {NoAck, ConsumerPrefetch, ExclusiveConsume, Args}}, ConsumerMapping), + State1 = State#ch{consumer_mapping = CM1, queue_states = QueueStates}, State2 = handle_queue_actions(Actions, State1), @@ -1775,9 +1777,9 @@ consumer_monitor(ConsumerTag, QCons1 = maps:put(QRef, CTags1, QCons), State#ch{queue_consumers = QCons1}. -handle_consuming_queue_down_or_eol(QRef, QName, +handle_consuming_queue_down_or_eol(QName, State = #ch{queue_consumers = QCons}) -> - ConsumerTags = case maps:find(QRef, QCons) of + ConsumerTags = case maps:find(QName, QCons) of error -> gb_sets:new(); {ok, CTags} -> CTags end, @@ -1795,7 +1797,7 @@ handle_consuming_queue_down_or_eol(QRef, QName, cancel_consumer(CTag, QName, StateN) end end - end, State#ch{queue_consumers = maps:remove(QRef, QCons)}, ConsumerTags). + end, State#ch{queue_consumers = maps:remove(QName, QCons)}, ConsumerTags). %% [0] There is a slight danger here that if a queue is deleted and %% then recreated again the reconsume will succeed even though it was @@ -2638,12 +2640,6 @@ maybe_cancel_tick_timer(#ch{tick_timer = TRef, State end. -qpid_to_ref(Pid) when is_pid(Pid) -> Pid; -qpid_to_ref({Name, Node}) when is_atom(Name) andalso is_atom(Node) -> - Name; -%% assume it already is a ref -qpid_to_ref(Ref) -> Ref. - now_millis() -> erlang:monotonic_time(millisecond). @@ -2712,7 +2708,7 @@ handle_queue_actions(Actions, #ch{} = State0) -> ({deliver, CTag, AckRequired, Msgs}, S0) -> handle_deliver(CTag, AckRequired, Msgs, S0); ({queue_down, QRef}, S0) -> - handle_consuming_queue_down_or_eol(QRef, QRef, S0) + handle_consuming_queue_down_or_eol(QRef, S0) end, State0, Actions). diff --git a/src/rabbit_classic_queue.erl b/src/rabbit_classic_queue.erl index 5eea2ba5d3..c383b3fc41 100644 --- a/src/rabbit_classic_queue.erl +++ b/src/rabbit_classic_queue.erl @@ -241,6 +241,12 @@ handle_event({reject_publish, SeqNo, _QPid}, handle_event({down, Pid, Info}, #?STATE{qref = QRef, pid = MasterPid, unconfirmed = U0} = State0) -> + Actions0 = case Pid =:= MasterPid of + true -> + [{queue_down, QRef}]; + false -> + [] + end, case rabbit_misc:is_abnormal_exit(Info) of false when Info =:= normal andalso Pid == MasterPid -> %% queue was deleted and masterpid is down @@ -254,7 +260,7 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef, {Unconfirmed, ConfirmedSeqNos, Rejected} = settle_seq_nos(MsgSeqNos, Pid, U0, down), Actions = [{settled, QRef, ConfirmedSeqNos}, - {rejected, QRef, Rejected}], + {rejected, QRef, Rejected} | Actions0], {ok, State0#?STATE{unconfirmed = Unconfirmed}, Actions}; true -> %% any abnormal exit should be considered a full reject of the @@ -270,7 +276,8 @@ handle_event({down, Pid, Info}, #?STATE{qref = QRef, end end, [], U0), U = maps:without(MsgIds, U0), - {ok, State0#?STATE{unconfirmed = U}, [{rejected, QRef, MsgIds}]} + {ok, State0#?STATE{unconfirmed = U}, + [{rejected, QRef, MsgIds} | Actions0]} end. -spec deliver([{amqqueue:amqqueue(), state()}], diff --git a/src/rabbit_queue_type.erl b/src/rabbit_queue_type.erl index f8dae3910e..727ee43faf 100644 --- a/src/rabbit_queue_type.erl +++ b/src/rabbit_queue_type.erl @@ -73,7 +73,6 @@ %% The queue type module will then emit a {confirm | reject, [msg_tag()} %% action to the channel or channel like process when a msg_tag %% has reached its conclusion - % unsettled = #{} :: #{msg_tag() => [queue_ref()]}, state :: queue_state()}). @@ -375,7 +374,7 @@ handle_down(Pid, Info, #?STATE{monitor_registry = Reg} = State0) -> %% TODO: remove Pid from monitor_registry case handle_event(QRef, {down, Pid, Info}, State0) of {ok, State, Actions} -> - {ok, State, [{queue_down, QRef} | Actions]}; + {ok, State, Actions}; eol -> {eol, QRef}; Err -> |
