diff options
| author | kjnilsson <knilsson@pivotal.io> | 2020-10-02 13:37:25 +0100 |
|---|---|---|
| committer | kjnilsson <knilsson@pivotal.io> | 2020-10-02 13:37:25 +0100 |
| commit | 1f8e23516e1e7dc0d930379ac2f0d05e10412a0d (patch) | |
| tree | 5dd0fe1993c733119cbc8a8a6b4e5101e768f2bc | |
| parent | bf57eeecce34d7d4d371c0e47d42f3dfe26d9fa4 (diff) | |
| download | rabbitmq-server-git-1f8e23516e1e7dc0d930379ac2f0d05e10412a0d.tar.gz | |
Quorum Queue reject publish bug fix
| -rw-r--r-- | src/rabbit_channel.erl | 29 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 2 | ||||
| -rw-r--r-- | src/rabbit_quorum_queue.erl | 40 |
3 files changed, 34 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 0887591ad5..b5c4ca8ca0 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -2076,24 +2076,22 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{ msg_seq_no = MsgSeqNo}, DelQNames}, State0 = #ch{queue_states = QueueStates0}) -> Qs = rabbit_amqqueue:lookup(DelQNames), + AllQueueNames = lists:foldl(fun (Q, Acc) -> + QRef = amqqueue:get_name(Q), + [QRef | Acc] + end, [], Qs), {ok, QueueStates, Actions} = rabbit_queue_type:deliver(Qs, Delivery, QueueStates0), - State1 = handle_queue_actions(Actions, - State0#ch{queue_states = QueueStates}), %% NB: the order here is important since basic.returns must be %% sent before confirms. - %% TODO: fix - HACK TO WORK OUT ALL QREFS - AllDeliveredQRefs = lists:foldl(fun (Q, Acc) -> - QRef = amqqueue:get_name(Q), - [QRef | Acc] - end, [], Qs), - ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs, - Message, State1), - State = process_routing_confirm(Confirm, - AllDeliveredQRefs, - MsgSeqNo, - XName, State1), - case rabbit_event:stats_level(State1, #ch.stats_timer) of + ok = process_routing_mandatory(Mandatory, Qs, Message, State0), + State1 = process_routing_confirm(Confirm, AllQueueNames, + MsgSeqNo, XName, State0), + %% Actions must be processed after registering confirms as actions may + %% contain rejections of publishes + State = handle_queue_actions(Actions, + State1#ch{queue_states = QueueStates}), + case rabbit_event:stats_level(State, #ch.stats_timer) of fine -> ?INCR_STATS(exchange_stats, XName, 1, publish), [?INCR_STATS(queue_exchange_stats, @@ -2132,7 +2130,6 @@ confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) -> {ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC), %% NB: don't call noreply/1 since we don't want to send confirms. record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}). - % record_rejects(RejectMXs, State1). send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) -> State; @@ -2700,7 +2697,7 @@ handle_queue_actions(Actions, #ch{} = State0) -> {ok, MX, U2} -> {U2, [MX | Acc]}; {error, not_found} -> - Acc + {U1, Acc} end end, {S0#ch.unconfirmed, []}, MsgSeqNos), S = S0#ch{unconfirmed = U}, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index a5695fbfcb..050b5afa1f 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -602,7 +602,7 @@ handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) -> handle_ra_event(_, {machine, {queue_status, Status}}, #state{} = State) -> %% just set the queue status - {internal, [], [], State#state{queue_status = Status}}; + {ok, State#state{queue_status = Status}, []}; handle_ra_event(Leader, {machine, leader_change}, #state{leader = Leader} = State) -> %% leader already known diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl index eb02333e43..1f333bdae1 100644 --- a/src/rabbit_quorum_queue.erl +++ b/src/rabbit_quorum_queue.erl @@ -203,7 +203,7 @@ ra_machine_config(Q) when ?is_amqqueue(Q) -> max_in_memory_bytes => MaxMemoryBytes, single_active_consumer_on => single_active_consumer_on(Q), delivery_limit => DeliveryLimit, - overflow_strategy => overflow(Overflow, drop_head), + overflow_strategy => overflow(Overflow, drop_head, QName), created => erlang:system_time(millisecond), expires => Expires }. @@ -735,7 +735,6 @@ stateless_deliver(ServerId, Delivery) -> rabbit_fifo_client:state()) -> {ok | slow, rabbit_fifo_client:state()} | {reject_publish, rabbit_fifo_client:state()}. - deliver(false, Delivery, QState0) -> case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of {ok, _} = Res -> Res; @@ -744,18 +743,8 @@ deliver(false, Delivery, QState0) -> {ok, State} end; deliver(true, Delivery, QState0) -> - Seq = Delivery#delivery.msg_seq_no, - case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, - Delivery#delivery.message, QState0) of - {ok, _} = Res -> Res; - {slow, _} = Res -> Res; - {reject_publish, State} -> - %% TODO: this works fine but once the queue types interface is in - %% place it could be replaced with an action or similar to avoid - %% self publishing messages. - gen_server2:cast(self(), {reject_publish, Seq, undefined}), - {ok, State} - end. + rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no, + Delivery#delivery.message, QState0). deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> lists:foldl( @@ -765,8 +754,14 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) -> [QRef], Delivery#delivery.message), {Qs, Actions}; ({Q, S0}, {Qs, Actions}) -> - {_, S} = deliver(Confirm, Delivery, S0), - {[{Q, S} | Qs], Actions} + case deliver(Confirm, Delivery, S0) of + {reject_publish, S} -> + Seq = Delivery#delivery.msg_seq_no, + QName = rabbit_fifo_client:cluster_name(S), + {[{Q, S} | Qs], [{rejected, QName, [Seq]} | Actions]}; + {_, S} -> + {[{Q, S} | Qs], Actions} + end end, {[], []}, QSs). @@ -1423,7 +1418,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg). check_invalid_arguments(QueueName, Args) -> Keys = [<<"x-message-ttl">>, - <<"x-max-priority">>, <<"x-queue-mode">>], + <<"x-max-priority">>, + <<"x-queue-mode">>], rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys). queue_name(RaFifoState) -> @@ -1487,6 +1483,10 @@ update_type_state(Q, Fun) when ?is_amqqueue(Q) -> Ts = amqqueue:get_type_state(Q), amqqueue:set_type_state(Q, Fun(Ts)). -overflow(undefined, Def) -> Def; -overflow(<<"reject-publish">>, _Def) -> reject_publish; -overflow(<<"drop-head">>, _Def) -> drop_head. +overflow(undefined, Def, _QName) -> Def; +overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish; +overflow(<<"drop-head">>, _Def, _QName) -> drop_head; +overflow(<<"reject-publish-dlx">> = V, _Def, QName) -> + rabbit_misc:protocol_error(precondition_failed, + "invalid overflow value '~s' for ~s", + [V, rabbit_misc:rs(QName)]). |
