summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorkjnilsson <knilsson@pivotal.io>2020-10-02 13:37:25 +0100
committerkjnilsson <knilsson@pivotal.io>2020-10-02 13:37:25 +0100
commit1f8e23516e1e7dc0d930379ac2f0d05e10412a0d (patch)
tree5dd0fe1993c733119cbc8a8a6b4e5101e768f2bc /src
parentbf57eeecce34d7d4d371c0e47d42f3dfe26d9fa4 (diff)
downloadrabbitmq-server-git-1f8e23516e1e7dc0d930379ac2f0d05e10412a0d.tar.gz
Quorum Queue reject publish bug fix
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl29
-rw-r--r--src/rabbit_fifo_client.erl2
-rw-r--r--src/rabbit_quorum_queue.erl40
3 files changed, 34 insertions, 37 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 0887591ad5..b5c4ca8ca0 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -2076,24 +2076,22 @@ deliver_to_queues({Delivery = #delivery{message = Message = #basic_message{
msg_seq_no = MsgSeqNo},
DelQNames}, State0 = #ch{queue_states = QueueStates0}) ->
Qs = rabbit_amqqueue:lookup(DelQNames),
+ AllQueueNames = lists:foldl(fun (Q, Acc) ->
+ QRef = amqqueue:get_name(Q),
+ [QRef | Acc]
+ end, [], Qs),
{ok, QueueStates, Actions} =
rabbit_queue_type:deliver(Qs, Delivery, QueueStates0),
- State1 = handle_queue_actions(Actions,
- State0#ch{queue_states = QueueStates}),
%% NB: the order here is important since basic.returns must be
%% sent before confirms.
- %% TODO: fix - HACK TO WORK OUT ALL QREFS
- AllDeliveredQRefs = lists:foldl(fun (Q, Acc) ->
- QRef = amqqueue:get_name(Q),
- [QRef | Acc]
- end, [], Qs),
- ok = process_routing_mandatory(Mandatory, AllDeliveredQRefs,
- Message, State1),
- State = process_routing_confirm(Confirm,
- AllDeliveredQRefs,
- MsgSeqNo,
- XName, State1),
- case rabbit_event:stats_level(State1, #ch.stats_timer) of
+ ok = process_routing_mandatory(Mandatory, Qs, Message, State0),
+ State1 = process_routing_confirm(Confirm, AllQueueNames,
+ MsgSeqNo, XName, State0),
+ %% Actions must be processed after registering confirms as actions may
+ %% contain rejections of publishes
+ State = handle_queue_actions(Actions,
+ State1#ch{queue_states = QueueStates}),
+ case rabbit_event:stats_level(State, #ch.stats_timer) of
fine ->
?INCR_STATS(exchange_stats, XName, 1, publish),
[?INCR_STATS(queue_exchange_stats,
@@ -2132,7 +2130,6 @@ confirm(MsgSeqNos, QRef, State = #ch{unconfirmed = UC}) ->
{ConfirmMXs, UC1} = rabbit_confirms:confirm(MsgSeqNos, QRef, UC),
%% NB: don't call noreply/1 since we don't want to send confirms.
record_confirms(ConfirmMXs, State#ch{unconfirmed = UC1}).
- % record_rejects(RejectMXs, State1).
send_confirms_and_nacks(State = #ch{tx = none, confirmed = [], rejected = []}) ->
State;
@@ -2700,7 +2697,7 @@ handle_queue_actions(Actions, #ch{} = State0) ->
{ok, MX, U2} ->
{U2, [MX | Acc]};
{error, not_found} ->
- Acc
+ {U1, Acc}
end
end, {S0#ch.unconfirmed, []}, MsgSeqNos),
S = S0#ch{unconfirmed = U},
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index a5695fbfcb..050b5afa1f 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -602,7 +602,7 @@ handle_ra_event(From, {machine, {delivery, _ConsumerTag, _} = Del}, State0) ->
handle_ra_event(_, {machine, {queue_status, Status}},
#state{} = State) ->
%% just set the queue status
- {internal, [], [], State#state{queue_status = Status}};
+ {ok, State#state{queue_status = Status}, []};
handle_ra_event(Leader, {machine, leader_change},
#state{leader = Leader} = State) ->
%% leader already known
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index eb02333e43..1f333bdae1 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -203,7 +203,7 @@ ra_machine_config(Q) when ?is_amqqueue(Q) ->
max_in_memory_bytes => MaxMemoryBytes,
single_active_consumer_on => single_active_consumer_on(Q),
delivery_limit => DeliveryLimit,
- overflow_strategy => overflow(Overflow, drop_head),
+ overflow_strategy => overflow(Overflow, drop_head, QName),
created => erlang:system_time(millisecond),
expires => Expires
}.
@@ -735,7 +735,6 @@ stateless_deliver(ServerId, Delivery) ->
rabbit_fifo_client:state()) ->
{ok | slow, rabbit_fifo_client:state()} |
{reject_publish, rabbit_fifo_client:state()}.
-
deliver(false, Delivery, QState0) ->
case rabbit_fifo_client:enqueue(Delivery#delivery.message, QState0) of
{ok, _} = Res -> Res;
@@ -744,18 +743,8 @@ deliver(false, Delivery, QState0) ->
{ok, State}
end;
deliver(true, Delivery, QState0) ->
- Seq = Delivery#delivery.msg_seq_no,
- case rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
- Delivery#delivery.message, QState0) of
- {ok, _} = Res -> Res;
- {slow, _} = Res -> Res;
- {reject_publish, State} ->
- %% TODO: this works fine but once the queue types interface is in
- %% place it could be replaced with an action or similar to avoid
- %% self publishing messages.
- gen_server2:cast(self(), {reject_publish, Seq, undefined}),
- {ok, State}
- end.
+ rabbit_fifo_client:enqueue(Delivery#delivery.msg_seq_no,
+ Delivery#delivery.message, QState0).
deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
lists:foldl(
@@ -765,8 +754,14 @@ deliver(QSs, #delivery{confirm = Confirm} = Delivery) ->
[QRef], Delivery#delivery.message),
{Qs, Actions};
({Q, S0}, {Qs, Actions}) ->
- {_, S} = deliver(Confirm, Delivery, S0),
- {[{Q, S} | Qs], Actions}
+ case deliver(Confirm, Delivery, S0) of
+ {reject_publish, S} ->
+ Seq = Delivery#delivery.msg_seq_no,
+ QName = rabbit_fifo_client:cluster_name(S),
+ {[{Q, S} | Qs], [{rejected, QName, [Seq]} | Actions]};
+ {_, S} ->
+ {[{Q, S} | Qs], Actions}
+ end
end, {[], []}, QSs).
@@ -1423,7 +1418,8 @@ maybe_send_reply(ChPid, Msg) -> ok = rabbit_channel:send_command(ChPid, Msg).
check_invalid_arguments(QueueName, Args) ->
Keys = [<<"x-message-ttl">>,
- <<"x-max-priority">>, <<"x-queue-mode">>],
+ <<"x-max-priority">>,
+ <<"x-queue-mode">>],
rabbit_queue_type_util:check_invalid_arguments(QueueName, Args, Keys).
queue_name(RaFifoState) ->
@@ -1487,6 +1483,10 @@ update_type_state(Q, Fun) when ?is_amqqueue(Q) ->
Ts = amqqueue:get_type_state(Q),
amqqueue:set_type_state(Q, Fun(Ts)).
-overflow(undefined, Def) -> Def;
-overflow(<<"reject-publish">>, _Def) -> reject_publish;
-overflow(<<"drop-head">>, _Def) -> drop_head.
+overflow(undefined, Def, _QName) -> Def;
+overflow(<<"reject-publish">>, _Def, _QName) -> reject_publish;
+overflow(<<"drop-head">>, _Def, _QName) -> drop_head;
+overflow(<<"reject-publish-dlx">> = V, _Def, QName) ->
+ rabbit_misc:protocol_error(precondition_failed,
+ "invalid overflow value '~s' for ~s",
+ [V, rabbit_misc:rs(QName)]).