summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/rabbit_channel.erl5
-rw-r--r--src/rabbit_fifo_client.erl49
2 files changed, 29 insertions, 25 deletions
diff --git a/src/rabbit_channel.erl b/src/rabbit_channel.erl
index 74d400950e..1755b4b8e2 100644
--- a/src/rabbit_channel.erl
+++ b/src/rabbit_channel.erl
@@ -1871,8 +1871,9 @@ handle_publishing_queue_down(QPid, Reason,
record_rejects(RejectMXs, State1)
end
end;
-handle_publishing_queue_down(QPid, _Reason, _State) when ?IS_QUORUM(QPid) ->
- error(quorum_queues_should_never_be_monitored).
+handle_publishing_queue_down(QPid, _Reason, State) when ?IS_QUORUM(QPid) ->
+ %% this should never happen after the queue type refactoring in 3.9
+ State.
handle_consuming_queue_down_or_eol(QRef,
State = #ch{queue_consumers = QCons,
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 83207d7bf9..9a6cd32a7b 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -144,29 +144,32 @@ enqueue(Correlation, Msg,
cfg = #cfg{}} = State0) ->
%% it is the first enqueue, check the version
{_, Node} = Server = pick_server(State0),
- State =
- case rpc:call(Node, rabbit_fifo, version, []) of
- 0 ->
- %% the leader is running the old version
- %% so we can't initialize the enqueuer session safely
- State0#state{queue_status = go};
- 1 ->
- %% were running the new version on the leader do sync initialisation
- %% of enqueuer session
- Reg = rabbit_fifo:make_register_enqueuer(self()),
- case ra:process_command(Server, Reg) of
- {ok, reject_publish, _} ->
- State0#state{queue_status = reject_publish};
- {ok, ok, _} ->
- State0#state{queue_status = go};
- Err ->
- exit(Err)
- end;
- {badrpc, nodedown} ->
- rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
- State0#state{queue_status = go}
- end,
- enqueue(Correlation, Msg, State);
+ case rpc:call(Node, rabbit_fifo, version, []) of
+ 0 ->
+ %% the leader is running the old version
+ %% so we can't initialize the enqueuer session safely
+ %% fall back on old behavour
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ 1 ->
+ %% were running the new version on the leader do sync initialisation
+ %% of enqueuer session
+ Reg = rabbit_fifo:make_register_enqueuer(self()),
+ case ra:process_command(Server, Reg) of
+ {ok, reject_publish, _} ->
+ {reject_publish, State0#state{queue_status = reject_publish}};
+ {ok, ok, _} ->
+ enqueue(Correlation, Msg, State0#state{queue_status = go});
+ {timeout, _} ->
+ %% if we timeout it is probably better to reject
+ %% the message than being uncertain
+ {reject_publish, State0};
+ Err ->
+ exit(Err)
+ end;
+ {badrpc, nodedown} ->
+ rabbit_log:info("rabbit_fifo_client: badrpc for node ~w", [Node]),
+ State0#state{queue_status = go}
+ end;
enqueue(_Correlation, _Msg,
#state{queue_status = reject_publish,
cfg = #cfg{}} = State) ->