summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorMichael Klishin <mklishin@pivotal.io>2019-11-13 17:39:50 +0300
committerGitHub <noreply@github.com>2019-11-13 17:39:50 +0300
commit6ce30e8c9b2cd3fe101228ffea230e2f28e8eb91 (patch)
treed83c9bd5bdcb39ff8c05f2234084f81093c71b50
parent2b3c42ef2ab91af851935eb55baebcab973a5203 (diff)
parent1b683c81a3bf26eeea238e38ca6b54ce4856858e (diff)
downloadrabbitmq-server-git-6ce30e8c9b2cd3fe101228ffea230e2f28e8eb91.tar.gz
Merge pull request #2154 from rabbitmq/improve-error-handling-qq
Improve error handling and logging around quorum queues
-rw-r--r--src/rabbit_amqqueue.erl20
-rw-r--r--src/rabbit_fifo_client.erl14
-rw-r--r--src/rabbit_quorum_queue.erl93
3 files changed, 83 insertions, 44 deletions
diff --git a/src/rabbit_amqqueue.erl b/src/rabbit_amqqueue.erl
index 5654986ce5..936b18c53a 100644
--- a/src/rabbit_amqqueue.erl
+++ b/src/rabbit_amqqueue.erl
@@ -1582,13 +1582,19 @@ basic_consume(Q,
QName = amqqueue:get_name(Q),
ok = check_consume_arguments(QName, Args),
QState0 = get_quorum_state(Id, QName, QStates),
- {ok, QState} = rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
- ConsumerPrefetchCount,
- ConsumerTag,
- ExclusiveConsume, Args,
- ActingUser,
- OkMsg, QState0),
- {ok, maps:put(Name, QState, QStates)}.
+ case rabbit_quorum_queue:basic_consume(Q, NoAck, ChPid,
+ ConsumerPrefetchCount,
+ ConsumerTag,
+ ExclusiveConsume, Args,
+ ActingUser,
+ OkMsg, QState0) of
+ {ok, QState} ->
+ {ok, maps:put(Name, QState, QStates)};
+ {error, Reason} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot consume a message from quorum queue '~s': ~w",
+ [rabbit_misc:rs(QName), Reason])
+ end.
-spec basic_cancel
(amqqueue:amqqueue(), pid(), rabbit_types:ctag(), any(),
diff --git a/src/rabbit_fifo_client.erl b/src/rabbit_fifo_client.erl
index 46064a52dc..c85dd7266a 100644
--- a/src/rabbit_fifo_client.erl
+++ b/src/rabbit_fifo_client.erl
@@ -713,8 +713,18 @@ get_missing_deliveries(Leader, From, To, ConsumerTag) ->
Query = fun (State) ->
rabbit_fifo:get_checked_out(ConsumerId, From, To, State)
end,
- {ok, {_, Missing}, _} = ra:local_query(Leader, Query),
- Missing.
+ case ra:local_query(Leader, Query) of
+ {ok, {_, Missing}, _} ->
+ Missing;
+ {error, Error} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot query missing deliveries from ~p: ~p",
+ [Leader, Error]);
+ {timeout, _} ->
+ rabbit_misc:protocol_error(internal_error,
+ "Cannot query missing deliveries from ~p: timeout",
+ [Leader])
+ end.
pick_node(#state{leader = undefined, servers = [N | _]}) ->
%% TODO: pick random rather that first?
diff --git a/src/rabbit_quorum_queue.erl b/src/rabbit_quorum_queue.erl
index 33bae0919f..75857f81a8 100644
--- a/src/rabbit_quorum_queue.erl
+++ b/src/rabbit_quorum_queue.erl
@@ -519,25 +519,31 @@ basic_consume(Q, NoAck, ChPid,
Prefetch,
ConsumerMeta,
QState0),
- {ok, {_, SacResult}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1),
-
- SingleActiveConsumerOn = single_active_consumer_on(Q),
- {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
- {false, _} ->
- {true, up};
- {true, {value, {ConsumerTag, ChPid}}} ->
- {true, single_active};
- _ ->
- {false, waiting}
- end,
-
- %% TODO: emit as rabbit_fifo effect
- rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
- not NoAck, QName,
- ConsumerPrefetchCount, IsSingleActiveConsumer,
- ActivityStatus, Args),
- {ok, QState}.
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1) of
+ {ok, {_, SacResult}, _} ->
+
+ SingleActiveConsumerOn = single_active_consumer_on(Q),
+ {IsSingleActiveConsumer, ActivityStatus} = case {SingleActiveConsumerOn, SacResult} of
+ {false, _} ->
+ {true, up};
+ {true, {value, {ConsumerTag, ChPid}}} ->
+ {true, single_active};
+ _ ->
+ {false, waiting}
+ end,
+
+ %% TODO: emit as rabbit_fifo effect
+ rabbit_core_metrics:consumer_created(ChPid, ConsumerTag, ExclusiveConsume,
+ not NoAck, QName,
+ ConsumerPrefetchCount, IsSingleActiveConsumer,
+ ActivityStatus, Args),
+ {ok, QState};
+ {error, Error} ->
+ Error;
+ {timeout, _} ->
+ {error, timeout}
+ end.
-spec basic_cancel(rabbit_types:ctag(), ChPid :: pid(), any(), rabbit_fifo_client:state()) ->
{'ok', rabbit_fifo_client:state()}.
@@ -1052,35 +1058,52 @@ i(open_files, Q) when ?is_amqqueue(Q) ->
lists:flatten(Data);
i(single_active_consumer_pid, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, SacResult}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1),
- case SacResult of
- {value, {_ConsumerTag, ChPid}} ->
+ case ra:local_query(QPid, fun rabbit_fifo:query_single_active_consumer/1) of
+ {ok, {_, {value, {_ConsumerTag, ChPid}}}, _} ->
ChPid;
- _ ->
+ {ok, _, _} ->
+ '';
+ {error, _} ->
+ '';
+ {timeout, _} ->
''
end;
i(single_active_consumer_ctag, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, SacResult}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_single_active_consumer/1),
- case SacResult of
- {value, {ConsumerTag, _ChPid}} ->
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_single_active_consumer/1) of
+ {ok, {_, {value, {ConsumerTag, _ChPid}}}, _} ->
ConsumerTag;
- _ ->
+ {ok, _, _} ->
+ '';
+ {error, _} ->
+ '';
+ {timeout, _} ->
''
end;
i(type, _) -> quorum;
i(messages_ram, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, {Length, _}}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1),
- Length;
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1) of
+ {ok, {_, {Length, _}}, _} ->
+ Length;
+ {error, _} ->
+ 0;
+ {timeout, _} ->
+ 0
+ end;
i(message_bytes_ram, Q) when ?is_amqqueue(Q) ->
QPid = amqqueue:get_pid(Q),
- {ok, {_, {_, Bytes}}, _} = ra:local_query(QPid,
- fun rabbit_fifo:query_in_memory_usage/1),
- Bytes;
+ case ra:local_query(QPid,
+ fun rabbit_fifo:query_in_memory_usage/1) of
+ {ok, {_, {_, Bytes}}, _} ->
+ Bytes;
+ {error, _} ->
+ 0;
+ {timeout, _} ->
+ 0
+ end;
i(_K, _Q) -> ''.
open_files(Name) ->