diff options
Diffstat (limited to 'src')
| -rw-r--r-- | src/rabbit_channel.erl | 5 | ||||
| -rw-r--r-- | src/rabbit_fifo_client.erl | 49 |
2 files changed, 29 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl index 74d400950e..1755b4b8e2 100644 --- a/src/rabbit_channel.erl +++ b/src/rabbit_channel.erl @@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason, record_rejects(RejectMXs, State1) end end; -handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) -> - error(quorum_queues_should_never_be_monitored). +handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) -> + %% this should never happen after the queue type refactoring in 3.9 + State. handle_consuming_queue_down_or_eol(QRef, State = #ch{queue_consumers = QCons, diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl index 83207d7bf9..9a6cd32a7b 100644 --- a/src/rabbit_fifo_client.erl +++ b/src/rabbit_fifo_client.erl @@ -144,29 +144,32 @@ enqueue(Correlation, Msg, cfg = #cfg{}} = State0) -> %% it is the first enqueue, check the version {_, Node} = Server = pick_server(State0), - State = - case rpc:call(Node, rabbit_fifo, version, []) of - 0 -> - %% the leader is running the old version - %% so we can't initialize the enqueuer session safely - State0#state{queue_status = go}; - 1 -> - %% were running the new version on the leader do sync initialisation - %% of enqueuer session - Reg = rabbit_fifo:make_register_enqueuer(self()), - case ra:process_command(Server, Reg) of - {ok, reject_publish, _} -> - State0#state{queue_status = reject_publish}; - {ok, ok, _} -> - State0#state{queue_status = go}; - Err -> - exit(Err) - end; - {badrpc, nodedown} -> - rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), - State0#state{queue_status = go} - end, - enqueue(Correlation, Msg, State); + case rpc:call(Node, rabbit_fifo, version, []) of + 0 -> + %% the leader is running the old version + %% so we can't initialize the enqueuer session safely + %% fall back on old behavour + enqueue(Correlation, Msg, State0#state{queue_status = go}); + 1 -> + %% were running the new version on the leader do sync initialisation + %% of enqueuer session + Reg = rabbit_fifo:make_register_enqueuer(self()), + case ra:process_command(Server, Reg) of + {ok, reject_publish, _} -> + {reject_publish, State0#state{queue_status = reject_publish}}; + {ok, ok, _} -> + enqueue(Correlation, Msg, State0#state{queue_status = go}); + {timeout, _} -> + %% if we timeout it is probably better to reject + %% the message than being uncertain + {reject_publish, State0}; + Err -> + exit(Err) + end; + {badrpc, nodedown} -> + rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]), + State0#state{queue_status = go} + end; enqueue(_Correlation, _Msg, #state{queue_status = reject_publish, cfg = #cfg{}} = State) -> |
